You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by dh...@apache.org on 2018/05/21 20:26:58 UTC

[1/2] impala git commit: IMPALA-6941: load more text scanner compression plugins

Repository: impala
Updated Branches:
  refs/heads/2.x fe57b2009 -> 75d19c874


IMPALA-6941: load more text scanner compression plugins

Add extensions for LZ4 and ZSTD (which are supported by Hadoop).
Even without a plugin this results in better behaviour because
we don't try to treat the files with unknown extensions as
uncompressed text.

Also allow loading tables containing files with unsupported
compression types. There was weird behaviour before we knew
of the file extension but didn't support querying the table -
the catalog would load the table but the impalad would fail
processing the catalog update. The simplest way to fix it
is to just allow loading the tables.

Similarly, make the "LOAD DATA" operation more permissive -
we can copy files into a directory even if we can't
decompress them.

Switch to always checking plugin version - running mismatched plugin
is inherently unsafe.

Testing:
Positive case where LZO is loaded is exercised. Added
coverage for negative case where LZO is disabled.

Fixed test gaps:
* Querying LZO table with LZO plugin not available.
* Interacting with tables with known but unsupported text
  compressions.
* Querying files with unknown compression suffixes (which are
  treated as uncompressed text).

Change-Id: If2a9c4a4a11bed81df706e9e834400bfedfe48e6
Reviewed-on: http://gerrit.cloudera.org:8080/10165
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/10462


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/c3bc72bd
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/c3bc72bd
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/c3bc72bd

Branch: refs/heads/2.x
Commit: c3bc72bda89755a7ac3a952df08cdf3d62b7caf9
Parents: fe57b20
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Apr 19 14:43:03 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Sat May 19 23:08:33 2018 +0000

----------------------------------------------------------------------
 be/src/exec/CMakeLists.txt                      |   2 +-
 be/src/exec/hdfs-lzo-text-scanner.cc            | 117 --------------
 be/src/exec/hdfs-lzo-text-scanner.h             |  64 --------
 be/src/exec/hdfs-plugin-text-scanner.cc         | 152 +++++++++++++++++++
 be/src/exec/hdfs-plugin-text-scanner.h          |  93 ++++++++++++
 be/src/exec/hdfs-scan-node-base.cc              |  16 +-
 be/src/exec/hdfs-text-scanner.cc                |  47 +++---
 be/src/exec/hdfs-text-scanner.h                 |  15 ++
 common/fbs/CatalogObjects.fbs                   |   3 +-
 .../apache/impala/analysis/LoadDataStmt.java    |  11 --
 .../apache/impala/catalog/HdfsCompression.java  |  18 ++-
 .../apache/impala/catalog/HdfsFileFormat.java   |  39 -----
 .../apache/impala/catalog/HdfsPartition.java    |  11 --
 .../queries/QueryTest/disable-lzo-plugin.test   |   7 +
 .../unsupported-compression-partitions.test     |  28 ++++
 tests/custom_cluster/test_scanner_plugin.py     |  34 +++++
 tests/metadata/test_partition_metadata.py       |  82 +++++++++-
 17 files changed, 459 insertions(+), 280 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/c3bc72bd/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 0317cfe..2349df4 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -53,8 +53,8 @@ add_library(Exec
   hdfs-avro-scanner.cc
   hdfs-avro-table-writer.cc
   hdfs-avro-scanner-ir.cc
+  hdfs-plugin-text-scanner.cc
   hdfs-text-scanner.cc
-  hdfs-lzo-text-scanner.cc
   hdfs-text-table-writer.cc
   hdfs-sequence-table-writer.cc
   hdfs-parquet-scanner.cc

http://git-wip-us.apache.org/repos/asf/impala/blob/c3bc72bd/be/src/exec/hdfs-lzo-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-lzo-text-scanner.cc b/be/src/exec/hdfs-lzo-text-scanner.cc
deleted file mode 100644
index 8af89f2..0000000
--- a/be/src/exec/hdfs-lzo-text-scanner.cc
+++ /dev/null
@@ -1,117 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/hdfs-lzo-text-scanner.h"
-
-#include <hdfs.h>
-#include <boost/algorithm/string.hpp>
-#include "common/version.h"
-#include "exec/hdfs-scan-node.h"
-#include "exec/read-write-util.h"
-#include "runtime/runtime-state.h"
-#include "runtime/hdfs-fs-cache.h"
-#include "util/debug-util.h"
-#include "util/hdfs-util.h"
-#include "util/dynamic-util.h"
-
-#include "common/names.h"
-
-using namespace impala;
-
-DEFINE_bool(skip_lzo_version_check, false, "Disables checking the LZO library version "
-            "against the running Impala version.");
-
-const string HdfsLzoTextScanner::LIB_IMPALA_LZO = "libimpalalzo.so";
-
-namespace impala {
-Status HdfsLzoTextScanner::library_load_status_;
-
-SpinLock HdfsLzoTextScanner::lzo_load_lock_;
-
-const char* (*GetImpalaLzoBuildVersion)();
-
-HdfsScanner* (*HdfsLzoTextScanner::CreateLzoTextScanner)(
-    HdfsScanNodeBase* scan_node, RuntimeState* state);
-
-Status (*HdfsLzoTextScanner::LzoIssueInitialRanges)(
-    HdfsScanNodeBase* scan_node, const std::vector<HdfsFileDesc*>& files);
-
-HdfsScanner* HdfsLzoTextScanner::GetHdfsLzoTextScanner(
-    HdfsScanNodeBase* scan_node, RuntimeState* state) {
-
-  // If the scanner was not loaded then no scans could be issued so we should
-  // never get here without having loaded the scanner.
-  DCHECK(CreateLzoTextScanner != NULL);
-
-  return (*CreateLzoTextScanner)(scan_node, state);
-}
-
-Status HdfsLzoTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
-    const vector<HdfsFileDesc*>& files) {
-  DCHECK(!files.empty());
-  if (LzoIssueInitialRanges == NULL) {
-    lock_guard<SpinLock> l(lzo_load_lock_);
-    if (library_load_status_.ok()) {
-      // LzoIssueInitialRanges && library_load_status_.ok() means we haven't tried loading
-      // the library yet.
-      library_load_status_ = LoadLzoLibrary();
-      if (!library_load_status_.ok()) {
-        stringstream ss;
-        ss << "Error loading impala-lzo library. Check that the impala-lzo library "
-           << "is at version " << GetDaemonBuildVersion();
-        library_load_status_.AddDetail(ss.str());
-        return library_load_status_;
-      }
-    } else {
-      // We only try to load the library once.
-      return library_load_status_;
-    }
-  }
-
-  return (*LzoIssueInitialRanges)(scan_node, files);
-}
-
-Status HdfsLzoTextScanner::LoadLzoLibrary() {
-  void* handle;
-  RETURN_IF_ERROR(DynamicOpen(LIB_IMPALA_LZO.c_str(), &handle));
-  RETURN_IF_ERROR(DynamicLookup(handle,
-      "GetImpalaBuildVersion", reinterpret_cast<void**>(&GetImpalaLzoBuildVersion)));
-
-  if (strcmp((*GetImpalaLzoBuildVersion)(), GetDaemonBuildVersion()) != 0) {
-    stringstream ss;
-    ss << "Impala LZO library was built against Impala version "
-       << (*GetImpalaLzoBuildVersion)() << ", but the running Impala version is "
-       << GetDaemonBuildVersion();
-    if (FLAGS_skip_lzo_version_check) {
-      LOG(ERROR) << ss.str();
-    } else {
-      return Status(ss.str());
-    }
-  }
-
-  RETURN_IF_ERROR(DynamicLookup(handle,
-      "CreateLzoTextScanner", reinterpret_cast<void**>(&CreateLzoTextScanner)));
-  RETURN_IF_ERROR(DynamicLookup(handle,
-      "LzoIssueInitialRangesImpl", reinterpret_cast<void**>(&LzoIssueInitialRanges)));
-
-  DCHECK(CreateLzoTextScanner != NULL);
-  DCHECK(LzoIssueInitialRanges != NULL);
-  LOG(INFO) << "Loaded impala-lzo library: " << LIB_IMPALA_LZO;
-  return Status::OK();
-}
-
-}

http://git-wip-us.apache.org/repos/asf/impala/blob/c3bc72bd/be/src/exec/hdfs-lzo-text-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-lzo-text-scanner.h b/be/src/exec/hdfs-lzo-text-scanner.h
deleted file mode 100644
index d6bddf9..0000000
--- a/be/src/exec/hdfs-lzo-text-scanner.h
+++ /dev/null
@@ -1,64 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_EXEC_HDFS_LZO_TEXT_SCANNER_H
-#define IMPALA_EXEC_HDFS_LZO_TEXT_SCANNER_H
-
-#include "common/status.h"
-#include "exec/scan-node.h"
-#include "exec/hdfs-scanner.h"
-#include "exec/hdfs-scan-node-base.h"
-#include "util/spinlock.h"
-
-namespace impala {
-
-/// This is a wrapper for calling the external HdfsLzoTextScanner
-/// The LZO scanner class is implemented in a dynamically linked library so that
-/// Impala does not include GPL code.  The two entry points are:
-/// IssueInitialRanges -- issue calls to the I/O manager to read the file headers
-/// GetHdfsLzoTextScanner -- returns a pointer to the Scanner object.
-class HdfsLzoTextScanner {
- public:
-  static HdfsScanner* GetHdfsLzoTextScanner(HdfsScanNodeBase* scan_node,
-      RuntimeState* state);
-  static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
-                                   const std::vector<HdfsFileDesc*>& files);
-
- private:
-  /// Impala LZO library name -- GPL code.
-  const static std::string LIB_IMPALA_LZO;
-
-  /// If non-OK, then we have tried and failed to load the LZO library.
-  static Status library_load_status_;
-
-  /// Lock to protect loading of the lzo file library.
-  static SpinLock lzo_load_lock_;
-
-  /// Dynamically linked function to create the Lzo Scanner Object.
-  static HdfsScanner* (*CreateLzoTextScanner)
-      (HdfsScanNodeBase* scan_node, RuntimeState* state);
-
-  /// Dynamically linked function to set the initial scan ranges.
-  static Status (*LzoIssueInitialRanges)(
-      HdfsScanNodeBase* scan_node, const std::vector<HdfsFileDesc*>& files);
-
-  /// Dynamically loads CreateLzoTextScanner and LzoIssueInitialRanges.
-  /// lzo_load_lock_ should be taken before calling this method.
-  static Status LoadLzoLibrary();
-};
-}
-#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/c3bc72bd/be/src/exec/hdfs-plugin-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-plugin-text-scanner.cc b/be/src/exec/hdfs-plugin-text-scanner.cc
new file mode 100644
index 0000000..d28fab5
--- /dev/null
+++ b/be/src/exec/hdfs-plugin-text-scanner.cc
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/hdfs-plugin-text-scanner.h"
+
+#include <algorithm>
+
+#include <hdfs.h>
+#include <boost/algorithm/string.hpp>
+#include "common/version.h"
+#include "exec/hdfs-scan-node.h"
+#include "exec/read-write-util.h"
+#include "runtime/runtime-state.h"
+#include "runtime/hdfs-fs-cache.h"
+#include "util/debug-util.h"
+#include "util/hdfs-util.h"
+#include "util/dynamic-util.h"
+
+#include "common/names.h"
+
+using namespace impala;
+
+using boost::algorithm::to_lower_copy;
+using boost::shared_lock;
+using boost::shared_mutex;
+using boost::upgrade_lock;
+using boost::upgrade_to_unique_lock;
+using std::find;
+
+// Allow LZO by default to maintain backwards compatibility. We can add more options
+// if we determine that the plugins are well-maintained and generally stable.
+DEFINE_string(enabled_hdfs_text_scanner_plugins, "LZO", "(Advanced) whitelist of HDFS "
+    "text scanner plugins that Impala will try to dynamically load. Must be a "
+    "comma-separated list of upper-case compression codec names. Each plugin implements "
+    "support for decompression and hands off the decompressed bytes to Impala's builtin "
+    "text parser for further processing (e.g. parsing delimited text).");
+
+DEFINE_bool_hidden(skip_lzo_version_check, false, "Deprecated - has no effect.");
+
+static const string LIB_IMPALA_TEMPLATE = "libimpala$0.so";
+
+namespace impala {
+
+shared_mutex HdfsPluginTextScanner::library_load_lock_;
+
+std::unordered_map<string, HdfsPluginTextScanner::LoadedPlugin>
+    HdfsPluginTextScanner::loaded_plugins_;
+
+HdfsScanner* HdfsPluginTextScanner::GetHdfsPluginTextScanner(
+    HdfsScanNodeBase* scan_node, RuntimeState* state, const string& plugin_name) {
+  CreateScannerFn create_scanner_fn;
+  {
+    shared_lock<shared_mutex> l(library_load_lock_);
+    // If the scanner was not loaded then no scans could be issued so we should
+    // never get here without having loaded the scanner.
+    auto it = loaded_plugins_.find(plugin_name);
+    DCHECK(it != loaded_plugins_.end());
+    create_scanner_fn = it->second.create_scanner_fn;
+  }
+  return create_scanner_fn(scan_node, state);
+}
+
+Status HdfsPluginTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
+    const vector<HdfsFileDesc*>& files, const string& plugin_name) {
+  DCHECK(!files.empty());
+  IssueInitialRangesFn issue_initial_ranges_fn;
+  RETURN_IF_ERROR(CheckPluginEnabled(plugin_name));
+  {
+    upgrade_lock<shared_mutex> read_lock(library_load_lock_);
+    auto it = loaded_plugins_.find(plugin_name);
+    if (it == loaded_plugins_.end()) {
+      // We haven't tried loading the library yet.
+      upgrade_to_unique_lock<shared_mutex> write_lock(read_lock);
+      it = loaded_plugins_.insert(make_pair(plugin_name, LoadedPlugin())).first;
+      it->second.library_load_status = LoadPluginLibrary(plugin_name, &it->second);
+      if (!it->second.library_load_status.ok()) {
+        it->second.library_load_status.AddDetail(Substitute(
+              "Error loading plugin library for $0. Check that the library is at "
+              "version $1", plugin_name, GetDaemonBuildVersion()));
+        return it->second.library_load_status;
+      }
+    } else {
+      // We only try to load the library once - propagate the error if it previously
+      // failed.
+      RETURN_IF_ERROR(it->second.library_load_status);
+    }
+    issue_initial_ranges_fn = it->second.issue_initial_ranges_fn;
+  }
+
+  return issue_initial_ranges_fn(scan_node, files);
+}
+
+Status HdfsPluginTextScanner::CheckPluginEnabled(const string& plugin_name) {
+  vector<string> enabled_plugins;
+  boost::split(enabled_plugins, FLAGS_enabled_hdfs_text_scanner_plugins,
+      boost::is_any_of(","));
+  if (find(enabled_plugins.begin(), enabled_plugins.end(), plugin_name)
+      == enabled_plugins.end()) {
+    return Status(Substitute("Scanner plugin '$0' is not one of the enabled plugins: '$1'",
+          plugin_name, FLAGS_enabled_hdfs_text_scanner_plugins));
+  }
+  return Status::OK();
+}
+
+Status HdfsPluginTextScanner::LoadPluginLibrary(const string& plugin_name,
+    LoadedPlugin* plugin) {
+  RETURN_IF_ERROR(CheckPluginEnabled(plugin_name));
+  GetPluginImpalaBuildVersionFn get_plugin_impala_build_version;
+  void* handle;
+  string lib_name = Substitute(LIB_IMPALA_TEMPLATE, to_lower_copy(plugin_name));
+  RETURN_IF_ERROR(DynamicOpen(lib_name.c_str(), &handle));
+  RETURN_IF_ERROR(DynamicLookup(handle, "GetImpalaBuildVersion",
+      reinterpret_cast<void**>(&get_plugin_impala_build_version)));
+  if (strcmp(get_plugin_impala_build_version(), GetDaemonBuildVersion()) != 0) {
+    return Status(Substitute(
+        "Scanner plugin $0 was built against Impala version $1 but the running Impala "
+        "version is $2", plugin_name, get_plugin_impala_build_version(),
+        GetDaemonBuildVersion()));
+  }
+
+  // Camel case the library name to generate correct symbol, e.g. "CreateFooTextScanner".
+  string plugin_camelcase = to_lower_copy(plugin_name);
+  plugin_camelcase[0] = toupper(plugin_camelcase[0]);
+  string create_symbol = Substitute("Create$0TextScanner", plugin_camelcase);
+  string issue_initial_ranges_symbol =
+        Substitute("$0IssueInitialRangesImpl", plugin_camelcase);
+  RETURN_IF_ERROR(DynamicLookup(handle, create_symbol.c_str(),
+        reinterpret_cast<void**>(&plugin->create_scanner_fn)));
+  RETURN_IF_ERROR(DynamicLookup(handle, issue_initial_ranges_symbol.c_str(),
+        reinterpret_cast<void**>(&plugin->issue_initial_ranges_fn)));
+
+  DCHECK(plugin->create_scanner_fn != nullptr);
+  DCHECK(plugin->issue_initial_ranges_fn != nullptr);
+  LOG(INFO) << "Loaded plugin library for " << plugin_name << ": " << lib_name;
+  return Status::OK();
+}
+
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/c3bc72bd/be/src/exec/hdfs-plugin-text-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-plugin-text-scanner.h b/be/src/exec/hdfs-plugin-text-scanner.h
new file mode 100644
index 0000000..4b00165
--- /dev/null
+++ b/be/src/exec/hdfs-plugin-text-scanner.h
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXEC_HDFS_PLUGIN_TEXT_SCANNER_H
+#define IMPALA_EXEC_HDFS_PLUGIN_TEXT_SCANNER_H
+
+#include <boost/thread/shared_mutex.hpp>
+
+#include "common/status.h"
+#include "exec/scan-node.h"
+#include "exec/hdfs-scanner.h"
+#include "exec/hdfs-scan-node-base.h"
+
+namespace impala {
+
+/// This is a wrapper for calling external implementations of text scanners for
+/// compression formats that Impala does not have builtin support for.
+/// The plugin scanners are implemented in dynamically linked libraries.
+///
+/// The two entry points are:
+/// IssueInitialRanges -- issue calls to the I/O manager to read the file headers
+/// GetHdfsPluginTextScanner -- returns a pointer to the Scanner object.
+///
+/// Plugin names should all be upper case. If the plugin name is FOO, then the plugin
+/// library must be called "libimpalafoo.so" and it must contain the following exported
+/// functions:
+///   const char* GetImpalaBuildVersion();
+///
+///   void FooIssueInitialRangesImpl(HdfsScanNodeBase*,
+///                                const std::vector<HdfsFileDesc*>&);
+///
+///   HdfsScanner* CreateFooTextScanner(HdfsScanNodeBase*, RuntimeState*);
+///
+class HdfsPluginTextScanner {
+ public:
+  static HdfsScanner* GetHdfsPluginTextScanner(HdfsScanNodeBase* scan_node,
+      RuntimeState* state, const std::string& plugin_name);
+  static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
+     const std::vector<HdfsFileDesc*>& files, const std::string& plugin_name);
+
+ private:
+  // Typedefs for functions loaded from plugin shared objects.
+  typedef const char* (*GetPluginImpalaBuildVersionFn)();
+  typedef HdfsScanner* (*CreateScannerFn)
+      (HdfsScanNodeBase* scan_node, RuntimeState* state);
+  typedef Status (*IssueInitialRangesFn)(
+      HdfsScanNodeBase* scan_node, const std::vector<HdfsFileDesc*>& files);
+
+  struct LoadedPlugin {
+    /// If non-OK, then we have tried and failed to load this plugin.
+    Status library_load_status;
+
+    /// Dynamically linked function to create the Scanner Object.
+    CreateScannerFn create_scanner_fn = nullptr;
+
+    /// Dynamically linked function to issue the initial scan ranges.
+    IssueInitialRangesFn issue_initial_ranges_fn = nullptr;
+  };
+
+  /// Lock to protect loading of libraries and 'loaded_plugins_'. We only allow loading a
+  /// single library at a time. Must be held in shared mode when accessing
+  /// 'loaded_plugins_' and exclusive mode when loading a library.
+  static boost::shared_mutex library_load_lock_;
+
+  /// Map from upper case plugin name to the loaded plugin.
+  /// Protected by 'library_load_lock_. Entries are never removed once loaded.
+  static std::unordered_map<std::string, LoadedPlugin> loaded_plugins_;
+
+  /// Return an error if the specified plugin isn't enabled.
+  static Status CheckPluginEnabled(const std::string& plugin_name);
+
+  /// Dynamically loads the required functions for the plugin identified by 'plugin_name'
+  /// and populates 'create_scanner_fn' and 'issue_initial_ranges_fn' in 'plugin'.
+  /// Returns an error if an error is encountered with loading the library or the
+  /// functions. 'library_load_lock_' must be held by the caller in exclusive mode.
+  static Status LoadPluginLibrary(const std::string& plugin_name, LoadedPlugin* plugin);
+};
+}
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/c3bc72bd/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 51ea9f3..00f3af1 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -16,9 +16,10 @@
 // under the License.
 
 #include "exec/hdfs-scan-node-base.h"
+
+#include "exec/hdfs-plugin-text-scanner.h"
 #include "exec/base-sequence-scanner.h"
 #include "exec/hdfs-text-scanner.h"
-#include "exec/hdfs-lzo-text-scanner.h"
 #include "exec/hdfs-sequence-scanner.h"
 #include "exec/hdfs-rcfile-scanner.h"
 #include "exec/hdfs-avro-scanner.h"
@@ -641,12 +642,15 @@ Status HdfsScanNodeBase::CreateAndOpenScanner(HdfsPartitionDescriptor* partition
   // Create a new scanner for this file format and compression.
   switch (partition->file_format()) {
     case THdfsFileFormat::TEXT:
-      // Lzo-compressed text files are scanned by a scanner that it is implemented as a
-      // dynamic library, so that Impala does not include GPL code.
-      if (compression == THdfsCompression::LZO) {
-        scanner->reset(HdfsLzoTextScanner::GetHdfsLzoTextScanner(this, runtime_state_));
-      } else {
+      if (HdfsTextScanner::HasBuiltinSupport(compression)) {
         scanner->reset(new HdfsTextScanner(this, runtime_state_));
+      } else {
+        // No builtin support - we must have loaded the plugin in IssueInitialRanges().
+        auto it = _THdfsCompression_VALUES_TO_NAMES.find(compression);
+        DCHECK(it != _THdfsCompression_VALUES_TO_NAMES.end())
+            << "Already issued ranges for this compression type.";
+        scanner->reset(HdfsPluginTextScanner::GetHdfsPluginTextScanner(
+            this, runtime_state_, it->second));
       }
       break;
     case THdfsFileFormat::SEQUENCE_FILE:

http://git-wip-us.apache.org/repos/asf/impala/blob/c3bc72bd/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 1b08c97..3e4c223 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -22,7 +22,7 @@
 #include "codegen/llvm-codegen.h"
 #include "exec/delimited-text-parser.h"
 #include "exec/delimited-text-parser.inline.h"
-#include "exec/hdfs-lzo-text-scanner.h"
+#include "exec/hdfs-plugin-text-scanner.h"
 #include "exec/hdfs-scan-node.h"
 #include "exec/scanner-context.inline.h"
 #include "exec/text-converter.h"
@@ -77,7 +77,7 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     const vector<HdfsFileDesc*>& files) {
   vector<ScanRange*> compressed_text_scan_ranges;
   int compressed_text_files = 0;
-  vector<HdfsFileDesc*> lzo_text_files;
+  map<string, vector<HdfsFileDesc*>> plugin_text_files;
   for (int i = 0; i < files.size(); ++i) {
     THdfsCompression::type compression = files[i]->file_compression;
     switch (compression) {
@@ -124,35 +124,36 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
         }
         break;
 
-      case THdfsCompression::LZO:
-        // lzo-compressed text need to be processed by the specialized HdfsLzoTextScanner.
-        // Note that any LZO_INDEX files (no matter what the case of their suffix) will be
-        // filtered by the planner.
-        {
-        #ifndef NDEBUG
-          // No straightforward way to do this in one line inside a DCHECK, so for once
-          // we'll explicitly use NDEBUG to avoid executing debug-only code.
-          string lower_filename = files[i]->filename;
-          to_lower(lower_filename);
-          DCHECK(!ends_with(lower_filename, LZO_INDEX_SUFFIX));
-        #endif
-          lzo_text_files.push_back(files[i]);
+      default: {
+        // Other compression formats are only supported by a plugin.
+        auto it = _THdfsCompression_VALUES_TO_NAMES.find(compression);
+        if (it == _THdfsCompression_VALUES_TO_NAMES.end()) {
+          return Status(Substitute(
+                "Unexpected compression enum value: $0", static_cast<int>(compression)));
         }
-        break;
-
-      default:
-        DCHECK(false);
+#ifndef NDEBUG
+        // Note any LZO_INDEX files (no matter what the case of their suffix) should be
+        // filtered by the planner.
+        // No straightforward way to do this in one line inside a DCHECK, so for once
+        // we'll explicitly use NDEBUG to avoid executing debug-only code.
+        string lower_filename = files[i]->filename;
+        to_lower(lower_filename);
+        DCHECK(!ends_with(lower_filename, LZO_INDEX_SUFFIX));
+#endif
+        plugin_text_files[it->second].push_back(files[i]);
+      }
     }
   }
   if (compressed_text_scan_ranges.size() > 0) {
     RETURN_IF_ERROR(scan_node->AddDiskIoRanges(compressed_text_scan_ranges,
         compressed_text_files));
   }
-  if (lzo_text_files.size() > 0) {
-    // This will dlopen the lzo binary and can fail if the lzo binary is not present.
-    RETURN_IF_ERROR(HdfsLzoTextScanner::IssueInitialRanges(scan_node, lzo_text_files));
+  for (const auto& entry : plugin_text_files) {
+    DCHECK_GT(entry.second.size(), 0) << "List should be non-empty";
+    // This can fail if the plugin library can't be loaded.
+    RETURN_IF_ERROR(HdfsPluginTextScanner::IssueInitialRanges(
+          scan_node, entry.second, entry.first));
   }
-
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/c3bc72bd/be/src/exec/hdfs-text-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index 25886ba..5b15372 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -65,6 +65,21 @@ class HdfsTextScanner : public HdfsScanner {
       const std::vector<ScalarExpr*>& conjuncts,
       llvm::Function** write_aligned_tuples_fn) WARN_UNUSED_RESULT;
 
+  /// Return true if we have builtin support for scanning text files compressed with this
+  /// codec.
+  static bool HasBuiltinSupport(THdfsCompression::type compression) {
+    switch (compression) {
+      case THdfsCompression::NONE:
+      case THdfsCompression::GZIP:
+      case THdfsCompression::SNAPPY:
+      case THdfsCompression::SNAPPY_BLOCKED:
+      case THdfsCompression::BZIP2:
+        return true;
+      default:
+        return false;
+    }
+  }
+
   /// Suffix for lzo index files.
   const static std::string LZO_INDEX_SUFFIX;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/c3bc72bd/common/fbs/CatalogObjects.fbs
----------------------------------------------------------------------
diff --git a/common/fbs/CatalogObjects.fbs b/common/fbs/CatalogObjects.fbs
index bf44380..c08099d 100644
--- a/common/fbs/CatalogObjects.fbs
+++ b/common/fbs/CatalogObjects.fbs
@@ -29,7 +29,8 @@ enum FbCompression: byte {
   SNAPPY_BLOCKED,
   LZO,
   LZ4,
-  ZLIB
+  ZLIB,
+  ZSTD
 }
 
 table FbFileBlock {

http://git-wip-us.apache.org/repos/asf/impala/blob/c3bc72bd/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java b/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
index 114862e..ddc0c6e 100644
--- a/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
@@ -223,17 +223,6 @@ public class LoadDataStmt extends StatementBase {
         }
       }
       Preconditions.checkNotNull(partition);
-
-      // Verify the files being loaded are supported.
-      for (FileStatus fStatus: fs.listStatus(source)) {
-        if (fs.isDirectory(fStatus.getPath())) continue;
-        StringBuilder errorMsg = new StringBuilder();
-        HdfsFileFormat fileFormat = partition.getInputFormatDescriptor().getFileFormat();
-        if (!fileFormat.isFileCompressionTypeSupported(fStatus.getPath().toString(),
-          errorMsg)) {
-          throw new AnalysisException(errorMsg.toString());
-        }
-      }
     } catch (FileNotFoundException e) {
       throw new AnalysisException("File not found: " + e.getMessage(), e);
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/impala/blob/c3bc72bd/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java b/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java
index dd81587..23282c3 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java
@@ -27,10 +27,11 @@ import com.google.common.collect.ImmutableMap;
  * Support for recognizing compression suffixes on data files.
  * Compression of a file is recognized in mapreduce by looking for suffixes of
  * supported codecs.
- * For now Impala supports LZO, GZIP, SNAPPY, and BZIP2. LZO can use the specific HIVE
- * input class.
+ * For now Impala supports LZO, GZIP, SNAPPY, BZIP2 and some additional formats if plugins
+ * are available. Even if a plugin is available, we need to add the file suffixes here so
+ * that we can resolve the compression type from the file name. LZO can use the specific
+ * HIVE input class.
  */
-// TODO: Add LZ4?
 public enum HdfsCompression {
   NONE,
   DEFLATE,
@@ -38,7 +39,9 @@ public enum HdfsCompression {
   BZIP2,
   SNAPPY,
   LZO,
-  LZO_INDEX; //Lzo index file.
+  LZO_INDEX, //Lzo index file.
+  LZ4,
+  ZSTD;
 
   /* Map from a suffix to a compression type */
   private static final ImmutableMap<String, HdfsCompression> SUFFIX_MAP =
@@ -49,6 +52,8 @@ public enum HdfsCompression {
           put("snappy", SNAPPY).
           put("lzo", LZO).
           put("index", LZO_INDEX).
+          put("lz4", LZ4).
+          put("zst", ZSTD).
           build();
 
   /* Given a file name return its compression type, if any. */
@@ -71,6 +76,8 @@ public enum HdfsCompression {
     case BZIP2: return THdfsCompression.BZIP2;
     case SNAPPY: return THdfsCompression.SNAPPY_BLOCKED;
     case LZO: return THdfsCompression.LZO;
+    case LZ4: return THdfsCompression.LZ4;
+    case ZSTD: return THdfsCompression.ZSTD;
     default: throw new IllegalStateException("Unexpected codec: " + this);
     }
   }
@@ -83,13 +90,14 @@ public enum HdfsCompression {
       case BZIP2: return FbCompression.BZIP2;
       case SNAPPY: return FbCompression.SNAPPY;
       case LZO: return FbCompression.LZO;
+      case LZ4: return FbCompression.LZ4;
+      case ZSTD: return FbCompression.ZSTD;
       default: throw new IllegalStateException("Unexpected codec: " + this);
     }
   }
 
   /* Returns a compression type based on (Hive's) intput format. Special case for LZO. */
   public static HdfsCompression fromHdfsInputFormatClass(String inputFormatClass) {
-    // TODO: Remove when we have the native LZO writer.
     Preconditions.checkNotNull(inputFormatClass);
     if (inputFormatClass.equals(HdfsFileFormat.LZO_TEXT.inputFormat())) {
       return LZO;

http://git-wip-us.apache.org/repos/asf/impala/blob/c3bc72bd/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java b/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java
index 32cae72..46c2b87 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java
@@ -201,45 +201,6 @@ public enum HdfsFileFormat {
     }
   }
 
-  /*
-   * Checks whether a file is supported in Impala based on the file extension.
-   * Returns true if the file format is supported. If the file format is not
-   * supported, then it returns false and 'errorMsg' contains details on the
-   * incompatibility.
-   *
-   * Impala supports LZO, GZIP, SNAPPY and BZIP2 on text files for partitions that have
-   * been declared in the metastore as TEXT. LZO files can have their own input format.
-   * For now, raise an error on any other type.
-   */
-  public boolean isFileCompressionTypeSupported(String fileName,
-      StringBuilder errorMsg) {
-    // Check to see if the file has a compression suffix.
-    // TODO: Add LZ4
-    HdfsCompression compressionType = HdfsCompression.fromFileName(fileName);
-    switch (compressionType) {
-      case LZO:
-      case LZO_INDEX:
-        // Index files are read by the LZO scanner directly.
-      case GZIP:
-      case SNAPPY:
-      case BZIP2:
-      case NONE:
-        return true;
-      case DEFLATE:
-        // TODO: Ensure that text/deflate works correctly
-        if (this == TEXT) {
-          errorMsg.append("Expected compressed text file with {.lzo,.gzip,.snappy,.bz2} "
-              + "suffix: " + fileName);
-          return false;
-        } else {
-          return true;
-        }
-      default:
-        errorMsg.append("Unknown compression suffix: " + fileName);
-        return false;
-    }
-  }
-
   /**
    * Returns true if this file format with the given compression format is splittable.
    */

http://git-wip-us.apache.org/repos/asf/impala/blob/c3bc72bd/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index e0850c6..1b05804 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -770,17 +770,6 @@ public class HdfsPartition implements Comparable<HdfsPartition> {
     } else {
       hmsParameters_ = Maps.newHashMap();
     }
-
-    // TODO: instead of raising an exception, we should consider marking this partition
-    // invalid and moving on, so that table loading won't fail and user can query other
-    // partitions.
-    for (FileDescriptor fileDescriptor: fileDescriptors_) {
-      StringBuilder errorMsg = new StringBuilder();
-      if (!getInputFormatDescriptor().getFileFormat().isFileCompressionTypeSupported(
-          fileDescriptor.getFileName(), errorMsg)) {
-        throw new RuntimeException(errorMsg.toString());
-      }
-    }
   }
 
   public HdfsPartition(HdfsTable table,

http://git-wip-us.apache.org/repos/asf/impala/blob/c3bc72bd/testdata/workloads/functional-query/queries/QueryTest/disable-lzo-plugin.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/disable-lzo-plugin.test b/testdata/workloads/functional-query/queries/QueryTest/disable-lzo-plugin.test
new file mode 100644
index 0000000..b141fd9
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/disable-lzo-plugin.test
@@ -0,0 +1,7 @@
+====
+---- QUERY
+# Test that running with plugin disabled fails gracefully.
+select * from functional_text_lzo.alltypes
+---- CATCH
+Scanner plugin 'LZO' is not one of the enabled plugins: ''
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/c3bc72bd/testdata/workloads/functional-query/queries/QueryTest/unsupported-compression-partitions.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/unsupported-compression-partitions.test b/testdata/workloads/functional-query/queries/QueryTest/unsupported-compression-partitions.test
new file mode 100644
index 0000000..23199cc
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/unsupported-compression-partitions.test
@@ -0,0 +1,28 @@
+====
+---- QUERY
+# Test that querying only partitions with supported formats works as expected.
+select count(*)
+from multi_text_compression where month <= 2
+---- TYPES
+BIGINT
+---- RESULTS
+590
+====
+---- QUERY
+# Test that querying partition with unsupported plugin fails gracefully.
+select count(*)
+from multi_text_compression where month <= 3
+---- CATCH
+Scanner plugin 'LZ4' is not one of the enabled plugins: 'LZO'
+====
+---- QUERY
+# Unknown compression suffix is treated as uncompressed text.
+select id
+from multi_text_compression where month = 4
+---- RESULTS
+---- TYPES
+INT
+---- ERRORS
+Error converting column: 0 to INT
+Error parsing row: file: __HDFS_FILENAME__, before offset: 16
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/c3bc72bd/tests/custom_cluster/test_scanner_plugin.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_scanner_plugin.py b/tests/custom_cluster/test_scanner_plugin.py
new file mode 100644
index 0000000..e30e6f5
--- /dev/null
+++ b/tests/custom_cluster/test_scanner_plugin.py
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+class TestScannerPlugin(CustomClusterTestSuite):
+  """Tests that involve changing the scanner plugin option."""
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--enabled_hdfs_text_scanner_plugins=")
+  def test_disable_lzo_plugin(self, vector):
+    """Test that we can gracefully handle a disabled plugin."""
+    # Should be able to query valid partitions only.
+    self.run_test_case('QueryTest/disable-lzo-plugin', vector)

http://git-wip-us.apache.org/repos/asf/impala/blob/c3bc72bd/tests/metadata/test_partition_metadata.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_partition_metadata.py b/tests/metadata/test_partition_metadata.py
index a6f635a..99eff1e 100644
--- a/tests/metadata/test_partition_metadata.py
+++ b/tests/metadata/test_partition_metadata.py
@@ -17,8 +17,9 @@
 
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal
-from tests.common.test_dimensions import create_single_exec_option_dimension
-from tests.util.filesystem_utils import WAREHOUSE
+from tests.common.test_dimensions import (create_single_exec_option_dimension,
+    create_uncompressed_text_dimension)
+from tests.util.filesystem_utils import get_fs_path, WAREHOUSE
 
 # Map from the test dimension file_format string to the SQL "STORED AS"
 # argument.
@@ -145,3 +146,80 @@ class TestPartitionMetadata(ImpalaTestSuite):
     self.client.execute("select * from %s" % FQ_TBL_IMP)
     # Make sure the table remains accessible in HIVE
     self.run_stmt_in_hive("select * from %s" % FQ_TBL_IMP)
+
+
+class TestPartitionMetadataUncompressedTextOnly(ImpalaTestSuite):
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestPartitionMetadataUncompressedTextOnly, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(
+        create_uncompressed_text_dimension(cls.get_workload()))
+
+  @SkipIfLocal.hdfs_client
+  def test_unsupported_text_compression(self, vector, unique_database):
+    """Test querying tables with a mix of supported and unsupported compression codecs.
+    Should be able to query partitions with supported codecs."""
+    TBL_NAME = "multi_text_compression"
+    FQ_TBL_NAME = unique_database + "." + TBL_NAME
+    TBL_LOCATION = get_fs_path(
+        '{0}/{1}.db/{2}'.format(WAREHOUSE, unique_database, TBL_NAME))
+
+    file_format = vector.get_value('table_format').file_format
+    # Clean up any existing data in the table directory.
+    self.filesystem_client.delete_file_dir(TBL_NAME, recursive=True)
+    # Create the table
+    self.client.execute(
+        "create external table {0} like functional.alltypes location '{1}'".format(
+        FQ_TBL_NAME, TBL_LOCATION))
+
+    self.__add_alltypes_partition(vector, FQ_TBL_NAME, "functional", 2009, 1)
+    self.__add_alltypes_partition(vector, FQ_TBL_NAME, "functional_text_lzo", 2009, 2)
+
+    # Create a new partition with a bogus file with the unsupported LZ4 suffix.
+    lz4_year = 2009
+    lz4_month = 3
+    lz4_ym_partition_loc = self.__make_ym_partition_dir(TBL_LOCATION, lz4_year, lz4_month)
+    self.filesystem_client.create_file("{0}/fake.lz4".format(lz4_ym_partition_loc)[1:],
+        "some test data")
+    self.client.execute(
+        "alter table {0} add partition (year={1}, month={2}) location '{3}'".format(
+        FQ_TBL_NAME, lz4_year, lz4_month, lz4_ym_partition_loc))
+
+    # Create a new partition with a bogus compression codec.
+    fake_comp_year = 2009
+    fake_comp_month = 4
+    fake_comp_ym_partition_loc = self.__make_ym_partition_dir(
+        TBL_LOCATION, fake_comp_year, fake_comp_month)
+    self.filesystem_client.create_file(
+        "{0}/fake.fake_comp".format(fake_comp_ym_partition_loc)[1:], "fake compression")
+    self.client.execute(
+        "alter table {0} add partition (year={1}, month={2}) location '{3}'".format(
+        FQ_TBL_NAME, fake_comp_year, fake_comp_month, fake_comp_ym_partition_loc))
+
+    show_files_result = self.client.execute("show files in {0}".format(FQ_TBL_NAME))
+    assert len(show_files_result.data) == 4, "Expected one file per partition dir"
+
+    self.run_test_case('QueryTest/unsupported-compression-partitions', vector,
+        unique_database)
+
+  def __add_alltypes_partition(self, vector, dst_tbl, src_db, year, month):
+    """Add the (year, month) partition from ${db_name}.alltypes to dst_tbl."""
+    tbl_location = self._get_table_location("{0}.alltypes".format(src_db), vector)
+    part_location = "{0}/year={1}/month={2}".format(tbl_location, year, month)
+    self.client.execute(
+        "alter table {0} add partition (year={1}, month={2}) location '{3}'".format(
+        dst_tbl, year, month, part_location))
+
+  def __make_ym_partition_dir(self, tbl_location, year, month):
+    """Create the year/month partition directory and return the path."""
+    y_partition_loc = "{0}/year={1}".format(tbl_location, year)
+    ym_partition_loc = "{0}/month={1}".format(y_partition_loc, month)
+    self.filesystem_client.delete_file_dir(tbl_location[1:], recursive=True)
+    self.filesystem_client.make_dir(tbl_location[1:])
+    self.filesystem_client.make_dir(y_partition_loc[1:])
+    self.filesystem_client.make_dir(ym_partition_loc[1:])
+    return ym_partition_loc


[2/2] impala git commit: IMPALA-5384, part 2: Simplify Coordinator locking and clarify state

Posted by dh...@apache.org.
IMPALA-5384, part 2: Simplify Coordinator locking and clarify state

The is the final change to clarify and break up the Coordinator's lock.
The state machine for the coordinator is made explicit, distinguishing
between executing state and multiple terminal states. Logic to
transition into a terminal state is centralized in one location and
executes exactly once for each coordinator object.

Derived from a patch for IMPALA_5384 by Marcel Kornacker.

Testing:
- exhaustive functional tests
- stress test on minicluster with memory overcommitment. Verified from
  the logs that this exercises all these paths:
  - successful queries
  - client requested cancellation
  - error from exec FInstances RPC
  - error reported asynchronously via report status RPC
  - eos before backend execution completed
- loop query_test & failure for 12 hours with no dchecks or crashes
  (This had previously reproduced IMPALA-7030 and IMPALA-7033 with
  the previous version of this change).

Change-Id: I6dc08da1295f1df3c9dce6d35d65d887b2c00a1c
Reviewed-on: http://gerrit.cloudera.org:8080/10440
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/10465


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/75d19c87
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/75d19c87
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/75d19c87

Branch: refs/heads/2.x
Commit: 75d19c874f2daf7e42231a257a97c07367660226
Parents: c3bc72b
Author: Dan Hecht <dh...@cloudera.com>
Authored: Fri Apr 13 16:51:25 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Mon May 21 19:57:46 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/coordinator-backend-state.h |   8 +
 be/src/runtime/coordinator.cc              | 428 ++++++++++++------------
 be/src/runtime/coordinator.h               | 331 +++++++++---------
 be/src/service/client-request-state.cc     |   2 +-
 be/src/service/impala-server.h             |   5 -
 be/src/util/counting-barrier.h             |  21 +-
 6 files changed, 400 insertions(+), 395 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/75d19c87/be/src/runtime/coordinator-backend-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index d2f122c..e7af2e2 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -21,9 +21,17 @@
 #include <vector>
 #include <unordered_set>
 
+#include <boost/accumulators/accumulators.hpp>
+#include <boost/accumulators/statistics/max.hpp>
+#include <boost/accumulators/statistics/mean.hpp>
+#include <boost/accumulators/statistics/median.hpp>
+#include <boost/accumulators/statistics/min.hpp>
+#include <boost/accumulators/statistics/stats.hpp>
+#include <boost/accumulators/statistics/variance.hpp>
 #include <boost/thread/mutex.hpp>
 
 #include "runtime/coordinator.h"
+#include "scheduling/query-schedule.h"
 #include "util/progress-updater.h"
 #include "util/stopwatch.h"
 #include "util/runtime-profile.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/75d19c87/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index a423de8..042605d 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -25,6 +25,7 @@
 #include <boost/algorithm/string.hpp>
 #include <gutil/strings/substitute.h>
 
+#include "common/hdfs.h"
 #include "exec/data-sink.h"
 #include "exec/plan-root-sink.h"
 #include "gen-cpp/ImpalaInternalService.h"
@@ -39,6 +40,7 @@
 #include "runtime/query-state.h"
 #include "scheduling/admission-controller.h"
 #include "scheduling/scheduler.h"
+#include "scheduling/query-schedule.h"
 #include "util/bloom-filter.h"
 #include "util/counting-barrier.h"
 #include "util/hdfs-bulk-ops.h"
@@ -51,16 +53,13 @@
 
 using namespace apache::thrift;
 using namespace rapidjson;
-using namespace strings;
 using boost::algorithm::iequals;
 using boost::algorithm::is_any_of;
 using boost::algorithm::join;
 using boost::algorithm::token_compress_on;
 using boost::algorithm::split;
 using boost::filesystem::path;
-using std::unique_ptr;
 
-DECLARE_int32(be_port);
 DECLARE_string(hostname);
 
 using namespace impala;
@@ -76,11 +75,9 @@ Coordinator::Coordinator(
     query_events_(events) {}
 
 Coordinator::~Coordinator() {
-  DCHECK(released_exec_resources_)
-      << "ReleaseExecResources() must be called before Coordinator is destroyed";
-  DCHECK(released_admission_control_resources_)
-      << "ReleaseAdmissionControlResources() must be called before Coordinator is "
-      << "destroyed";
+  // Must have entered a terminal exec state guaranteeing resources were released.
+  DCHECK_NE(exec_state_, ExecState::EXECUTING);
+  // Release the coordinator's reference to the query control structures.
   if (query_state_ != nullptr) {
     ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state_);
   }
@@ -109,12 +106,6 @@ Status Coordinator::Exec() {
   bool is_mt_execution = request.query_ctx.client_request.query_options.mt_dop > 0;
   if (is_mt_execution) filter_mode_ = TRuntimeFilterMode::OFF;
 
-  // to keep things simple, make async Cancel() calls wait until plan fragment
-  // execution has been initiated, otherwise we might try to cancel fragment
-  // execution at Impala daemons where it hasn't even started
-  // TODO: revisit this, it may not be true anymore
-  lock_guard<mutex> l(lock_);
-
   query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(query_ctx());
   query_state_->AcquireExecResourceRefcount(); // Decremented in ReleaseExecResources().
   filter_mem_tracker_ = query_state_->obj_pool()->Add(new MemTracker(
@@ -138,9 +129,9 @@ Status Coordinator::Exec() {
     InitFilterRoutingTable();
   }
 
-  // At this point, all static setup is done and all structures are initialized.
-  // Only runtime-related state changes past this point (examples:
-  // num_remaining_backends_, fragment instance profiles, etc.)
+  // At this point, all static setup is done and all structures are initialized. Only
+  // runtime-related state changes past this point (examples: fragment instance
+  // profiles, etc.)
 
   StartBackendExec();
   RETURN_IF_ERROR(FinishBackendStartup());
@@ -155,7 +146,7 @@ Status Coordinator::Exec() {
       // which means we failed Prepare
       Status prepare_status = query_state_->WaitForPrepare();
       DCHECK(!prepare_status.ok());
-      return prepare_status;
+      return UpdateExecState(prepare_status, nullptr, FLAGS_hostname);
     }
 
     // When GetFInstanceState() returns the coordinator instance, the Prepare phase
@@ -169,7 +160,6 @@ Status Coordinator::Exec() {
     coord_sink_ = coord_instance_->root_sink();
     DCHECK(coord_sink_ != nullptr);
   }
-
   return Status::OK();
 }
 
@@ -208,6 +198,8 @@ void Coordinator::InitFragmentStats() {
 void Coordinator::InitBackendStates() {
   int num_backends = schedule_.per_backend_exec_params().size();
   DCHECK_GT(num_backends, 0);
+
+  lock_guard<SpinLock> l(backend_states_init_lock_);
   backend_states_.resize(num_backends);
 
   RuntimeProfile::Counter* num_backends_counter =
@@ -215,19 +207,13 @@ void Coordinator::InitBackendStates() {
   num_backends_counter->Set(num_backends);
 
   // create BackendStates
-  bool has_coord_fragment = schedule_.GetCoordFragment() != nullptr;
-  const TNetworkAddress& coord_address = ExecEnv::GetInstance()->backend_address();
   int backend_idx = 0;
   for (const auto& entry: schedule_.per_backend_exec_params()) {
-    if (has_coord_fragment && coord_address == entry.first) {
-      coord_backend_idx_ = backend_idx;
-    }
     BackendState* backend_state = obj_pool()->Add(
         new BackendState(query_id(), backend_idx, filter_mode_));
     backend_state->Init(entry.second, fragment_stats_, obj_pool());
     backend_states_[backend_idx++] = backend_state;
   }
-  DCHECK(!has_coord_fragment || coord_backend_idx_ != -1);
 }
 
 void Coordinator::ExecSummary::Init(const QuerySchedule& schedule) {
@@ -341,8 +327,8 @@ void Coordinator::InitFilterRoutingTable() {
 
 void Coordinator::StartBackendExec() {
   int num_backends = backend_states_.size();
-  exec_complete_barrier_.reset(new CountingBarrier(num_backends));
-  num_remaining_backends_ = num_backends;
+  exec_rpcs_complete_barrier_.reset(new CountingBarrier(num_backends));
+  backend_exec_complete_barrier_.reset(new CountingBarrier(num_backends));
 
   DebugOptions debug_options(schedule_.query_options());
 
@@ -354,11 +340,11 @@ void Coordinator::StartBackendExec() {
     ExecEnv::GetInstance()->exec_rpc_thread_pool()->Offer(
         [backend_state, this, &debug_options]() {
           backend_state->Exec(query_ctx(), debug_options, filter_routing_table_,
-            exec_complete_barrier_.get());
+              exec_rpcs_complete_barrier_.get());
         });
   }
+  exec_rpcs_complete_barrier_->Wait();
 
-  exec_complete_barrier_->Wait();
   VLOG_QUERY << "started execution on " << num_backends << " backends for query_id="
              << PrintId(query_id());
   query_events_->MarkEvent(
@@ -367,26 +353,24 @@ void Coordinator::StartBackendExec() {
 }
 
 Status Coordinator::FinishBackendStartup() {
-  Status status = Status::OK();
   const TMetricDef& def =
       MakeTMetricDef("backend-startup-latencies", TMetricKind::HISTOGRAM, TUnit::TIME_MS);
   // Capture up to 30 minutes of start-up times, in ms, with 4 s.f. accuracy.
   HistogramMetric latencies(def, 30 * 60 * 1000, 4);
+  Status status = Status::OK();
+  string error_hostname;
   for (BackendState* backend_state: backend_states_) {
     // preserve the first non-OK, if there is one
     Status backend_status = backend_state->GetStatus();
-    if (!backend_status.ok() && status.ok()) status = backend_status;
+    if (!backend_status.ok() && status.ok()) {
+      status = backend_status;
+      error_hostname = backend_state->impalad_address().hostname;
+    }
     latencies.Update(backend_state->rpc_latency());
   }
-
   query_profile_->AddInfoString(
       "Backend startup latencies", latencies.ToHumanReadable());
-
-  if (!status.ok()) {
-    query_status_ = status;
-    CancelInternal();
-  }
-  return status;
+  return UpdateExecState(status, nullptr, error_hostname);
 }
 
 string Coordinator::FilterDebugString() {
@@ -446,40 +430,118 @@ string Coordinator::FilterDebugString() {
   return Substitute("\n$0", table_printer.ToString());
 }
 
-Status Coordinator::GetStatus() {
-  lock_guard<mutex> l(lock_);
-  return query_status_;
+const char* Coordinator::ExecStateToString(const ExecState state) {
+  static const unordered_map<ExecState, const char *> exec_state_to_str{
+    {ExecState::EXECUTING,        "EXECUTING"},
+    {ExecState::RETURNED_RESULTS, "RETURNED_RESULTS"},
+    {ExecState::CANCELLED,        "CANCELLED"},
+    {ExecState::ERROR,            "ERROR"}};
+  return exec_state_to_str.at(state);
 }
 
-Status Coordinator::UpdateStatus(const Status& status, const string& backend_hostname,
-    bool is_fragment_failure, const TUniqueId& instance_id) {
+Status Coordinator::SetNonErrorTerminalState(const ExecState state) {
+  DCHECK(state == ExecState::RETURNED_RESULTS || state == ExecState::CANCELLED);
+  Status ret_status;
   {
-    lock_guard<mutex> l(lock_);
-
-    // The query is done and we are just waiting for backends to clean up.
-    // Ignore their cancelled updates.
-    if (returned_all_results_ && status.IsCancelled()) return query_status_;
-
-    // nothing to update
-    if (status.ok()) return query_status_;
-
-    // don't override an error status; also, cancellation has already started
-    if (!query_status_.ok()) return query_status_;
-
-    query_status_ = status;
-    CancelInternal();
-  }
-
-  if (is_fragment_failure) {
-    // Log the id of the fragment that first failed so we can track it down more easily.
-    VLOG_QUERY << "query_id=" << PrintId(query_id())
-               << " failed because fragment_instance_id=" << PrintId(instance_id)
-               << " on host=" << backend_hostname << " failed.";
+    lock_guard<SpinLock> l(exec_state_lock_);
+    // May have already entered a terminal state, in which case nothing to do.
+    if (exec_state_ != ExecState::EXECUTING) return exec_status_;
+    DCHECK(exec_status_.ok()) << exec_status_;
+    exec_state_ = state;
+    if (state == ExecState::CANCELLED) exec_status_ = Status::CANCELLED;
+    ret_status = exec_status_;
+  }
+  VLOG_QUERY << Substitute("ExecState: query id=$0 execution $1", PrintId(query_id()),
+      state == ExecState::CANCELLED ? "cancelled" : "completed");
+  HandleExecStateTransition(ExecState::EXECUTING, state);
+  return ret_status;
+}
+
+Status Coordinator::UpdateExecState(const Status& status,
+    const TUniqueId* failed_finst, const string& instance_hostname) {
+  Status ret_status;
+  ExecState old_state, new_state;
+  {
+    lock_guard<SpinLock> l(exec_state_lock_);
+    old_state = exec_state_;
+    if (old_state == ExecState::EXECUTING) {
+      DCHECK(exec_status_.ok()) << exec_status_;
+      if (!status.ok()) {
+        // Error while executing - go to ERROR state.
+        exec_status_ = status;
+        exec_state_ = ExecState::ERROR;
+      }
+    } else if (old_state == ExecState::RETURNED_RESULTS) {
+      // Already returned all results. Leave exec status as ok, stay in this state.
+      DCHECK(exec_status_.ok()) << exec_status_;
+    } else if (old_state == ExecState::CANCELLED) {
+      // Client requested cancellation already, stay in this state.  Ignores errors
+      // after requested cancellations.
+      DCHECK(exec_status_.IsCancelled()) << exec_status_;
+    } else {
+      // Already in the ERROR state, stay in this state but update status to be the
+      // first non-cancelled status.
+      DCHECK_EQ(old_state, ExecState::ERROR);
+      DCHECK(!exec_status_.ok());
+      if (!status.ok() && !status.IsCancelled() && exec_status_.IsCancelled()) {
+        exec_status_ = status;
+      }
+    }
+    new_state = exec_state_;
+    ret_status = exec_status_;
+  }
+  // Log interesting status: a non-cancelled error or a cancellation if was executing.
+  if (!status.ok() && (!status.IsCancelled() || old_state == ExecState::EXECUTING)) {
+    VLOG_QUERY << Substitute(
+        "ExecState: query id=$0 finstance=$1 on host=$2 ($3 -> $4) status=$5",
+        PrintId(query_id()), failed_finst != nullptr ? PrintId(*failed_finst) : "N/A",
+        instance_hostname, ExecStateToString(old_state), ExecStateToString(new_state),
+        status.GetDetail());
+  }
+  // After dropping the lock, apply the state transition (if any) side-effects.
+  HandleExecStateTransition(old_state, new_state);
+  return ret_status;
+}
+
+bool Coordinator::ReturnedAllResults() {
+  lock_guard<SpinLock> l(exec_state_lock_);
+  return exec_state_ == ExecState::RETURNED_RESULTS;
+}
+
+void Coordinator::HandleExecStateTransition(
+    const ExecState old_state, const ExecState new_state) {
+  static const unordered_map<ExecState, const char *> exec_state_to_event{
+    {ExecState::EXECUTING,        "Executing"},
+    {ExecState::RETURNED_RESULTS, "Last row fetched"},
+    {ExecState::CANCELLED,        "Execution cancelled"},
+    {ExecState::ERROR,            "Execution error"}};
+  if (old_state == new_state) return;
+  // Once we enter a terminal state, we stay there, guaranteeing this code runs only once.
+  DCHECK_EQ(old_state, ExecState::EXECUTING);
+  // Should never transition to the initial state.
+  DCHECK_NE(new_state, ExecState::EXECUTING);
+  // Can't transition until the exec RPCs are no longer in progress. Otherwise, a
+  // cancel RPC could be missed, and resources freed before a backend has had a chance
+  // to take a resource reference.
+  DCHECK(exec_rpcs_complete_barrier_ != nullptr &&
+      exec_rpcs_complete_barrier_->pending() <= 0) << "exec rpcs not completed";
+
+  query_events_->MarkEvent(exec_state_to_event.at(new_state));
+  if (coord_sink_ != nullptr) coord_sink_->CloseConsumer();
+  // This thread won the race to transitioning into a terminal state - terminate
+  // execution and release resources.
+  ReleaseExecResources();
+  if (new_state == ExecState::RETURNED_RESULTS) {
+    // TODO: IMPALA-6984: cancel all backends in this case too.
+    WaitForBackends();
   } else {
-    VLOG_QUERY << "query_id=" << PrintId(query_id()) << " failed due to error on host="
-               << backend_hostname;
+    CancelBackends();
   }
-  return query_status_;
+  ReleaseAdmissionControlResources();
+  // Can compute summary only after we stop accepting reports from the backends. Both
+  // WaitForBackends() and CancelBackends() ensures that.
+  // TODO: should move this off of the query execution path?
+  ComputeQuerySummary();
 }
 
 Status Coordinator::FinalizeHdfsInsert() {
@@ -491,7 +553,7 @@ Status Coordinator::FinalizeHdfsInsert() {
 
   VLOG_QUERY << "Finalizing query: " << PrintId(query_id());
   SCOPED_TIMER(finalization_timer_);
-  Status return_status = GetStatus();
+  Status return_status = UpdateExecState(Status::OK(), nullptr, FLAGS_hostname);
   if (return_status.ok()) {
     HdfsTableDescriptor* hdfs_table;
     RETURN_IF_ERROR(DescriptorTbl::CreateHdfsTblDescriptor(query_ctx().desc_tbl,
@@ -517,22 +579,13 @@ Status Coordinator::FinalizeHdfsInsert() {
   return return_status;
 }
 
-Status Coordinator::WaitForBackendCompletion() {
-  unique_lock<mutex> l(lock_);
-  while (num_remaining_backends_ > 0 && query_status_.ok()) {
-    VLOG_QUERY << "Coordinator waiting for backends to finish, "
-               << num_remaining_backends_ << " remaining. query_id="
-               << PrintId(query_id());
-    backend_completion_cv_.Wait(l);
+void Coordinator::WaitForBackends() {
+  int32_t num_remaining = backend_exec_complete_barrier_->pending();
+  if (num_remaining > 0) {
+    VLOG_QUERY << "Coordinator waiting for backends to finish, " << num_remaining
+               << " remaining. query_id=" << PrintId(query_id());
+    backend_exec_complete_barrier_->Wait();
   }
-  if (query_status_.ok()) {
-    VLOG_QUERY << "All backends finished successfully. query_id=" << PrintId(query_id());
-  } else {
-    VLOG_QUERY << "All backends finished due to one or more errors. query_id="
-               << PrintId(query_id()) << ". " << query_status_.GetDetail();
-  }
-
-  return query_status_;
 }
 
 Status Coordinator::Wait() {
@@ -543,34 +596,22 @@ Status Coordinator::Wait() {
 
   if (stmt_type_ == TStmtType::QUERY) {
     DCHECK(coord_instance_ != nullptr);
-    return UpdateStatus(coord_instance_->WaitForOpen(), FLAGS_hostname, true,
-        runtime_state()->fragment_instance_id());
+    return UpdateExecState(coord_instance_->WaitForOpen(),
+        &coord_instance_->runtime_state()->fragment_instance_id(), FLAGS_hostname);
   }
-
   DCHECK_EQ(stmt_type_, TStmtType::DML);
-  // Query finalization can only happen when all backends have reported relevant
-  // state. They only have relevant state to report in the parallel INSERT case,
-  // otherwise all the relevant state is from the coordinator fragment which will be
-  // available after Open() returns.  Ignore the returned status if finalization is
-  // required., since FinalizeHdfsInsert() will pick it up and needs to execute
-  // regardless.
-  Status status = WaitForBackendCompletion();
-  if (finalize_params() == nullptr && !status.ok()) return status;
-
-  // Execution of query fragments has finished. We don't need to hold onto query execution
-  // resources while we finalize the query.
-  ReleaseExecResources();
-  // Query finalization is required only for HDFS table sinks
-  if (finalize_params() != nullptr) RETURN_IF_ERROR(FinalizeHdfsInsert());
-  // Release admission control resources after we'd done the potentially heavyweight
-  // finalization.
-  ReleaseAdmissionControlResources();
-
+  // DML finalization can only happen when all backends have completed all side-effects
+  // and reported relevant state.
+  WaitForBackends();
+  if (finalize_params() != nullptr) {
+    RETURN_IF_ERROR(UpdateExecState(
+            FinalizeHdfsInsert(), nullptr, FLAGS_hostname));
+  }
+  // DML requests are finished at this point.
+  RETURN_IF_ERROR(SetNonErrorTerminalState(ExecState::RETURNED_RESULTS));
   query_profile_->AddInfoString(
       "DML Stats", dml_exec_state_.OutputPartitionStats("\n"));
-  // For DML queries, when Wait is done, the query is complete.
-  ComputeQuerySummary();
-  return status;
+  return Status::OK();
 }
 
 Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
@@ -578,88 +619,54 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
   DCHECK(has_called_wait_);
   SCOPED_TIMER(query_profile_->total_time_counter());
 
-  if (returned_all_results_) {
-    // May be called after the first time we set *eos. Re-set *eos and return here;
-    // already torn-down coord_sink_ so no more work to do.
+  // exec_state_lock_ not needed here though since this path won't execute concurrently
+  // with itself or DML finalization.
+  if (exec_state_ == ExecState::RETURNED_RESULTS) {
+    // Nothing left to do: already in a terminal state and no more results.
     *eos = true;
     return Status::OK();
   }
+  DCHECK(coord_instance_ != nullptr) << "Exec() should be called first";
+  DCHECK(coord_sink_ != nullptr)     << "Exec() should be called first";
+  RuntimeState* runtime_state = coord_instance_->runtime_state();
 
-  DCHECK(coord_sink_ != nullptr)
-      << "GetNext() called without result sink. Perhaps Prepare() failed and was not "
-      << "checked?";
-  Status status = coord_sink_->GetNext(runtime_state(), results, max_rows, eos);
-
-  // if there was an error, we need to return the query's error status rather than
-  // the status we just got back from the local executor (which may well be CANCELLED
-  // in that case).  Coordinator fragment failed in this case so we log the query_id.
-  RETURN_IF_ERROR(UpdateStatus(status, FLAGS_hostname, true,
-      runtime_state()->fragment_instance_id()));
-
-  if (*eos) {
-    returned_all_results_ = true;
-    query_events_->MarkEvent("Last row fetched");
-    // release query execution resources here, since we won't be fetching more result rows
-    ReleaseExecResources();
-    // wait for all backends to complete before computing the summary
-    // TODO: relocate this so GetNext() won't have to wait for backends to complete?
-    RETURN_IF_ERROR(WaitForBackendCompletion());
-    // Release admission control resources after backends are finished.
-    ReleaseAdmissionControlResources();
-    // if the query completed successfully, compute the summary
-    if (query_status_.ok()) ComputeQuerySummary();
-  }
-
+  Status status = coord_sink_->GetNext(runtime_state, results, max_rows, eos);
+  RETURN_IF_ERROR(UpdateExecState(
+          status, &runtime_state->fragment_instance_id(), FLAGS_hostname));
+  if (*eos) RETURN_IF_ERROR(SetNonErrorTerminalState(ExecState::RETURNED_RESULTS));
   return Status::OK();
 }
 
-void Coordinator::Cancel(const Status* cause) {
-  lock_guard<mutex> l(lock_);
-  // if the query status indicates an error, cancellation has already been initiated;
-  // prevent others from cancelling a second time
-  if (!query_status_.ok()) return;
-
-  // TODO: This should default to OK(), not CANCELLED if there is no cause (or callers
-  // should explicitly pass Status::OK()). Fragment instances may be cancelled at the end
-  // of a successful query. Need to clean up relationship between query_status_ here and
-  // in QueryExecState. See IMPALA-4279.
-  query_status_ = (cause != nullptr && !cause->ok()) ? *cause : Status::CANCELLED;
-  CancelInternal();
+void Coordinator::Cancel() {
+  // Illegal to call Cancel() before Exec() returns, so there's no danger of the cancel
+  // RPC passing the exec RPC.
+  DCHECK(exec_rpcs_complete_barrier_ != nullptr &&
+      exec_rpcs_complete_barrier_->pending() <= 0) << "Exec() must be called first";
+  discard_result(SetNonErrorTerminalState(ExecState::CANCELLED));
 }
 
-void Coordinator::CancelInternal() {
-  VLOG_QUERY << "Cancel() query_id=" << PrintId(query_id());
-  // TODO: remove when restructuring cancellation, which should happen automatically
-  // as soon as the coordinator knows that the query is finished
-  DCHECK(!query_status_.ok());
-
+void Coordinator::CancelBackends() {
   int num_cancelled = 0;
   for (BackendState* backend_state: backend_states_) {
     DCHECK(backend_state != nullptr);
     if (backend_state->Cancel()) ++num_cancelled;
   }
+  backend_exec_complete_barrier_->NotifyRemaining();
+
   VLOG_QUERY << Substitute(
       "CancelBackends() query_id=$0, tried to cancel $1 backends",
       PrintId(query_id()), num_cancelled);
-  backend_completion_cv_.NotifyAll();
-
-  ReleaseExecResourcesLocked();
-  ReleaseAdmissionControlResourcesLocked();
-  // Report the summary with whatever progress the query made before being cancelled.
-  ComputeQuerySummary();
 }
 
 Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& params) {
-  VLOG_FILE << "UpdateBackendExecStatus()  backend_idx=" << params.coord_state_idx;
+  VLOG_FILE << "UpdateBackendExecStatus() query_id=" << PrintId(query_id())
+            << " backend_idx=" << params.coord_state_idx;
   if (params.coord_state_idx >= backend_states_.size()) {
     return Status(TErrorCode::INTERNAL_ERROR,
         Substitute("Unknown backend index $0 (max known: $1)",
             params.coord_state_idx, backend_states_.size() - 1));
   }
   BackendState* backend_state = backend_states_[params.coord_state_idx];
-  // TODO: return here if returned_all_results_?
-  // TODO: return CANCELLED in that case? Although that makes the cancellation propagation
-  // path more irregular.
 
   // TODO: only do this when the sink is done; probably missing a done field
   // in TReportExecStatus for that
@@ -668,46 +675,31 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& param
   }
 
   if (backend_state->ApplyExecStatusReport(params, &exec_summary_, &progress_)) {
-    // This report made this backend done, so update the status and
-    // num_remaining_backends_.
-
-    // for now, abort the query if we see any error except if returned_all_results_ is
-    // true (UpdateStatus() initiates cancellation, if it hasn't already been)
-    // TODO: clarify control flow here, it's unclear we should even process this status
-    // report if returned_all_results_ is true
+    // This backend execution has completed.
     bool is_fragment_failure;
     TUniqueId failed_instance_id;
     Status status = backend_state->GetStatus(&is_fragment_failure, &failed_instance_id);
-    if (!status.ok() && !returned_all_results_) {
-      Status ignored =
-          UpdateStatus(status, TNetworkAddressToString(backend_state->impalad_address()),
-              is_fragment_failure, failed_instance_id);
-      return Status::OK();
-    }
-
-    lock_guard<mutex> l(lock_);
-    DCHECK_GT(num_remaining_backends_, 0);
-    if (VLOG_QUERY_IS_ON && num_remaining_backends_ > 1) {
-      VLOG_QUERY << "Backend completed: "
-          << " host=" << TNetworkAddressToString(backend_state->impalad_address())
-          << " remaining=" << num_remaining_backends_ - 1
-          << " query_id=" << PrintId(query_id());
+    int pending_backends = backend_exec_complete_barrier_->Notify();
+    if (VLOG_QUERY_IS_ON && pending_backends >= 0) {
+      VLOG_QUERY << "Backend completed:"
+                 << " host=" << TNetworkAddressToString(backend_state->impalad_address())
+                 << " remaining=" << pending_backends
+                 << " query_id=" << PrintId(query_id());
       BackendState::LogFirstInProgress(backend_states_);
     }
-    if (--num_remaining_backends_ == 0 || !status.ok()) {
-      backend_completion_cv_.NotifyAll();
+    if (!status.ok()) {
+      // We may start receiving status reports before all exec rpcs are complete.
+      // Can't apply state transition until no more exec rpcs will be sent.
+      exec_rpcs_complete_barrier_->Wait();
+      discard_result(UpdateExecState(status,
+              is_fragment_failure ? &failed_instance_id : nullptr,
+              TNetworkAddressToString(backend_state->impalad_address())));
     }
-    return Status::OK();
   }
   // If all results have been returned, return a cancelled status to force the fragment
   // instance to stop executing.
-  if (returned_all_results_) return Status::CANCELLED;
-
-  return Status::OK();
-}
-
-RuntimeState* Coordinator::runtime_state() {
-  return coord_instance_ == nullptr ? nullptr : coord_instance_->runtime_state();
+  // TODO: Make returning CANCELLED unnecessary with IMPALA-6984.
+  return ReturnedAllResults() ? Status::CANCELLED : Status::OK();
 }
 
 // TODO: add histogram/percentile
@@ -740,20 +732,14 @@ void Coordinator::ComputeQuerySummary() {
 
 string Coordinator::GetErrorLog() {
   ErrorLogMap merged;
-  for (BackendState* state: backend_states_) {
-    state->MergeErrorLog(&merged);
+  {
+    lock_guard<SpinLock> l(backend_states_init_lock_);
+    for (BackendState* state: backend_states_) state->MergeErrorLog(&merged);
   }
   return PrintErrorMapToString(merged);
 }
 
 void Coordinator::ReleaseExecResources() {
-  lock_guard<mutex> l(lock_);
-  ReleaseExecResourcesLocked();
-}
-
-void Coordinator::ReleaseExecResourcesLocked() {
-  if (released_exec_resources_) return;
-  released_exec_resources_ = true;
   if (filter_routing_table_.size() > 0) {
     query_profile_->AddInfoString("Final filter table", FilterDebugString());
   }
@@ -767,8 +753,6 @@ void Coordinator::ReleaseExecResourcesLocked() {
   }
   // This may be NULL while executing UDFs.
   if (filter_mem_tracker_ != nullptr) filter_mem_tracker_->Close();
-  // Need to protect against failed Prepare(), where root_sink() would not be set.
-  if (coord_sink_ != nullptr) coord_sink_->CloseConsumer();
   // Now that we've released our own resources, can release query-wide resources.
   if (query_state_ != nullptr) query_state_->ReleaseExecResourceRefcount();
   // At this point some tracked memory may still be used in the coordinator for result
@@ -776,27 +760,20 @@ void Coordinator::ReleaseExecResourcesLocked() {
 }
 
 void Coordinator::ReleaseAdmissionControlResources() {
-  lock_guard<mutex> l(lock_);
-  ReleaseAdmissionControlResourcesLocked();
-}
-
-void Coordinator::ReleaseAdmissionControlResourcesLocked() {
-  if (released_admission_control_resources_) return;
-  LOG(INFO) << "Release admission control resources for query_id="
-            << PrintId(query_ctx().query_id);
+  LOG(INFO) << "Release admission control resources for query_id=" << PrintId(query_id());
   AdmissionController* admission_controller =
       ExecEnv::GetInstance()->admission_controller();
   if (admission_controller != nullptr) admission_controller->ReleaseQuery(schedule_);
-  released_admission_control_resources_ = true;
   query_events_->MarkEvent("Released admission control resources");
 }
 
 void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
   DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)
       << "UpdateFilter() called although runtime filters are disabled";
-  DCHECK(exec_complete_barrier_.get() != nullptr)
+  DCHECK(exec_rpcs_complete_barrier_.get() != nullptr)
       << "Filters received before fragments started!";
-  exec_complete_barrier_->Wait();
+
+  exec_rpcs_complete_barrier_->Wait();
   DCHECK(filter_routing_table_complete_)
       << "Filter received before routing table complete";
 
@@ -867,6 +844,7 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
   rpc_params.__set_dst_query_id(query_id());
   rpc_params.__set_filter_id(params.filter_id);
 
+  // Waited for exec_rpcs_complete_barrier_ so backend_states_ is valid.
   for (BackendState* bs: backend_states_) {
     for (int fragment_idx: target_fragment_idxs) {
       rpc_params.__set_dst_fragment_idx(fragment_idx);
@@ -940,23 +918,19 @@ void Coordinator::FilterState::Disable(MemTracker* tracker) {
   }
 }
 
-const TUniqueId& Coordinator::query_id() const {
-  return query_ctx().query_id;
-}
-
 void Coordinator::GetTExecSummary(TExecSummary* exec_summary) {
   lock_guard<SpinLock> l(exec_summary_.lock);
   *exec_summary = exec_summary_.thrift_exec_summary;
 }
 
 MemTracker* Coordinator::query_mem_tracker() const {
-  return query_state()->query_mem_tracker();
+  return query_state_->query_mem_tracker();
 }
 
 void Coordinator::BackendsToJson(Document* doc) {
   Value states(kArrayType);
   {
-    lock_guard<mutex> l(lock_);
+    lock_guard<SpinLock> l(backend_states_init_lock_);
     for (BackendState* state : backend_states_) {
       Value val(kObjectType);
       state->ToJson(&val, doc);
@@ -969,7 +943,7 @@ void Coordinator::BackendsToJson(Document* doc) {
 void Coordinator::FInstanceStatsToJson(Document* doc) {
   Value states(kArrayType);
   {
-    lock_guard<mutex> l(lock_);
+    lock_guard<SpinLock> l(backend_states_init_lock_);
     for (BackendState* state : backend_states_) {
       Value val(kObjectType);
       state->InstanceStatsToJson(&val, doc);
@@ -979,6 +953,14 @@ void Coordinator::FInstanceStatsToJson(Document* doc) {
   doc->AddMember("backend_instances", states, doc->GetAllocator());
 }
 
+const TQueryCtx& Coordinator::query_ctx() const {
+  return schedule_.request().query_ctx;
+}
+
+const TUniqueId& Coordinator::query_id() const {
+  return query_ctx().query_id;
+}
+
 const TFinalizeParams* Coordinator::finalize_params() const {
   return schedule_.request().__isset.finalize_params
       ? &schedule_.request().finalize_params : nullptr;

http://git-wip-us.apache.org/repos/asf/impala/blob/75d19c87/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 723047b..ae85bcd 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -20,26 +20,15 @@
 
 #include <string>
 #include <vector>
-#include <boost/accumulators/accumulators.hpp>
-#include <boost/accumulators/statistics/max.hpp>
-#include <boost/accumulators/statistics/mean.hpp>
-#include <boost/accumulators/statistics/median.hpp>
-#include <boost/accumulators/statistics/min.hpp>
-#include <boost/accumulators/statistics/stats.hpp>
-#include <boost/accumulators/statistics/variance.hpp>
 #include <boost/scoped_ptr.hpp>
 #include <boost/unordered_map.hpp>
-#include <boost/unordered_set.hpp>
 #include <rapidjson/document.h>
 
 #include "common/global-types.h"
-#include "common/hdfs.h"
 #include "common/status.h"
 #include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/Types_types.h"
 #include "runtime/dml-exec-state.h"
-#include "runtime/query-state.h"
-#include "scheduling/query-schedule.h"
 #include "util/condition-variable.h"
 #include "util/progress-updater.h"
 #include "util/runtime-profile-counters.h"
@@ -56,6 +45,7 @@ class TPlanExecRequest;
 class TRuntimeProfileTree;
 class RuntimeProfile;
 class QueryResultSet;
+class QuerySchedule;
 class MemTracker;
 class PlanRootSink;
 class FragmentInstanceState;
@@ -65,10 +55,9 @@ class QueryState;
 /// Query coordinator: handles execution of fragment instances on remote nodes, given a
 /// TQueryExecRequest. As part of that, it handles all interactions with the executing
 /// backends; it is also responsible for implementing all client requests regarding the
-/// query, including cancellation. Once a query ends, either through cancellation or
-/// by returning eos, the coordinator releases resources. (Note that DML requests
-/// always end with cancellation, via ImpalaServer::UnregisterQuery()/
-/// ImpalaServer::CancelInternal()/ClientRequestState::Cancel().)
+/// query, including cancellation. Once a query ends, either by returning EOS, through
+/// client cancellation, returning an error, or by finalizing a DML request, the
+/// coordinator releases resources.
 ///
 /// The coordinator monitors the execution status of fragment instances and aborts the
 /// entire query if an error is reported by any of them.
@@ -77,80 +66,81 @@ class QueryState;
 /// rows are produced by a fragment instance that always executes on the same machine as
 /// the coordinator.
 ///
-/// Thread-safe, with the exception of GetNext().
-//
+/// Thread-safe except where noted.
+///
 /// A typical sequence of calls for a single query (calls under the same numbered
 /// item can happen concurrently):
 /// 1. client: Exec()
 /// 2. client: Wait()/client: Cancel()/backend: UpdateBackendExecStatus()
 /// 3. client: GetNext()*/client: Cancel()/backend: UpdateBackendExecStatus()
 ///
-/// The implementation ensures that setting an overall error status and initiating
-/// cancellation of all fragment instances is atomic.
+/// A query is considered to be executing until one of three things occurs:
+/// 1. An error is encountered. Backend cancellation is automatically initiated for all
+///    backends that haven't yet completed and the overall query status is set to the
+///    first (non-cancelled) encountered error status.
+/// 2. The query is cancelled via an explicit Cancel() call. The overall query status
+///    is set to CANCELLED and cancellation is initiated for all backends still
+///    executing (without an error status).
+/// 3. The query has returned all rows. The overall query status is OK (and remains
+///    OK). Client cancellation is no longer possible and subsequent backend errors are
+///    ignored. (TODO: IMPALA-6984 initiate backend cancellation in this case).
+///
+/// Lifecycle: this object must not be destroyed until after one of the three states
+/// above is reached (error, cancelled, or EOS) to ensure resources are released.
+///
+/// Lock ordering: (lower-numbered acquired before higher-numbered)
+/// 1. wait_lock_
+/// 2. exec_state_lock_, backend_states_init_lock_, filter_lock_,
+///    ExecSummary::lock (leafs)
 ///
 /// TODO: move into separate subdirectory and move nested classes into separate files
 /// and unnest them
-/// TODO: clean up locking behavior; in particular, clarify dependency on lock_
-/// TODO: clarify cancellation path; in particular, cancel as soon as we return
-/// all results
 class Coordinator { // NOLINT: The member variables could be re-ordered to save space
  public:
   Coordinator(const QuerySchedule& schedule, RuntimeProfile::EventSequence* events);
   ~Coordinator();
 
-  /// Initiate asynchronous execution of a query with the given schedule. When it returns,
-  /// all fragment instances have started executing at their respective backends.
-  /// A call to Exec() must precede all other member function calls.
+  /// Initiate asynchronous execution of a query with the given schedule. When it
+  /// returns, all fragment instances have started executing at their respective
+  /// backends. Exec() must be called exactly once and a call to Exec() must precede
+  /// all other member function calls.
   Status Exec() WARN_UNUSED_RESULT;
 
   /// Blocks until result rows are ready to be retrieved via GetNext(), or, if the
-  /// query doesn't return rows, until the query finishes or is cancelled.
-  /// A call to Wait() must precede all calls to GetNext().
-  /// Multiple calls to Wait() are idempotent and it is okay to issue multiple
-  /// Wait() calls concurrently.
+  /// query doesn't return rows, until the query finishes or is cancelled. A call to
+  /// Wait() must precede all calls to GetNext().  Multiple calls to Wait() are
+  /// idempotent and it is okay to issue multiple Wait() calls concurrently.
   Status Wait() WARN_UNUSED_RESULT;
 
   /// Fills 'results' with up to 'max_rows' rows. May return fewer than 'max_rows'
-  /// rows, but will not return more.
-  ///
-  /// If *eos is true, execution has completed. Subsequent calls to GetNext() will be a
-  /// no-op.
-  ///
-  /// GetNext() will not set *eos=true until all fragment instances have either completed
-  /// or have failed.
-  ///
-  /// Returns an error status if an error was encountered either locally or by any of the
-  /// remote fragments or if the query was cancelled.
+  /// rows, but will not return more. If *eos is true, all rows have been returned.
+  /// Returns a non-OK status if an error was encountered either locally or by any of
+  /// the executing backends, or if the query was cancelled via Cancel().  After *eos
+  /// is true, subsequent calls to GetNext() will be a no-op.
   ///
   /// GetNext() is not thread-safe: multiple threads must not make concurrent GetNext()
-  /// calls (but may call any of the other member functions concurrently with GetNext()).
+  /// calls.
   Status GetNext(QueryResultSet* results, int max_rows, bool* eos) WARN_UNUSED_RESULT;
 
-  /// Cancel execution of query. This includes the execution of the local plan fragment,
-  /// if any, as well as all plan fragments on remote nodes. Sets query_status_ to the
-  /// given cause if non-NULL. Otherwise, sets query_status_ to Status::CANCELLED.
-  /// Idempotent.
-  void Cancel(const Status* cause = nullptr);
+  /// Cancel execution of query and sets the overall query status to CANCELLED if the
+  /// query is still executing. Idempotent.
+  void Cancel();
 
-  /// Updates execution status of a particular backend as well as dml_exec_state_.
-  /// Also updates num_remaining_backends_ and cancels execution if the backend has an
-  /// error status.
+  /// Called by the report status RPC handler to update execution status of a
+  /// particular backend as well as dml_exec_state_ and the profile. This may block if
+  /// exec RPCs are pending.
   Status UpdateBackendExecStatus(const TReportExecStatusParams& params)
       WARN_UNUSED_RESULT;
 
-  /// Only valid to call after Exec().
-  QueryState* query_state() const { return query_state_; }
-
   /// Get cumulative profile aggregated over all fragments of the query.
   /// This is a snapshot of the current state of execution and will change in
   /// the future if not all fragments have finished execution.
   RuntimeProfile* query_profile() const { return query_profile_; }
 
-  const TUniqueId& query_id() const;
-
+  /// Safe to call only after Exec().
   MemTracker* query_mem_tracker() const;
 
-  /// This is safe to call only after Wait()
+  /// Safe to call only after Wait().
   DmlExecState* dml_exec_state() { return &dml_exec_state_; }
 
   /// Return error log for coord and all the fragments. The error messages from the
@@ -159,9 +149,6 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
 
   const ProgressUpdater& progress() { return progress_; }
 
-  /// Returns query_status_.
-  Status GetStatus();
-
   /// Get a copy of the current exec summary. Thread-safe.
   void GetTExecSummary(TExecSummary* exec_summary);
 
@@ -188,18 +175,20 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// owned by the ClientRequestState that owns this coordinator
   const QuerySchedule& schedule_;
 
-  /// copied from TQueryExecRequest, governs when to call ReportQuerySummary
+  /// Copied from TQueryExecRequest, governs when finalization occurs. Set in Exec().
   TStmtType::type stmt_type_;
 
-  /// BackendStates for all execution backends, including the coordinator.
-  /// All elements are non-nullptr. Owned by obj_pool(). Populated by
-  /// InitBackendExec().
+  /// BackendStates for all execution backends, including the coordinator. All elements
+  /// are non-nullptr and owned by obj_pool(). Populated by Exec()/InitBackendStates().
   std::vector<BackendState*> backend_states_;
 
-  // index into backend_states_ for coordinator fragment; -1 if no coordinator fragment
-  int coord_backend_idx_ = -1;
+  /// Protects the population of backend_states_ vector (not the BackendState objects).
+  /// Used when accessing backend_states_ if it's not guaranteed that
+  /// InitBackendStates() has completed.
+  SpinLock backend_states_init_lock_;
 
-  /// The QueryState for this coordinator. Set in Exec(). Released in TearDown().
+  /// The QueryState for this coordinator. Reference taken in Exec(). Reference
+  /// released in destructor.
   QueryState* query_state_ = nullptr;
 
   /// Non-null if and only if the query produces results for the client; i.e. is of
@@ -210,12 +199,10 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// Result rows are materialized by this fragment instance in its own thread. They are
   /// materialized into a QueryResultSet provided to the coordinator during GetNext().
   ///
-  /// Not owned by this class. Set in Exec(). Reset to nullptr (and the implied
-  /// reference of QueryState released) in TearDown().
+  /// Owned by the QueryState. Set in Exec().
   FragmentInstanceState* coord_instance_ = nullptr;
 
-  /// Not owned by this class. Set in Exec(). Reset to nullptr in TearDown() or when
-  /// GetNext() hits eos.
+  /// Owned by the QueryState. Set in Exec().
   PlanRootSink* coord_sink_ = nullptr;
 
   /// ensures single-threaded execution of Wait(). See lock ordering class comment.
@@ -223,9 +210,17 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
 
   bool has_called_wait_ = false;  // if true, Wait() was called; protected by wait_lock_
 
-  /// Keeps track of number of completed ranges and total scan ranges.
+  /// Keeps track of number of completed ranges and total scan ranges. Initialized by
+  /// Exec().
   ProgressUpdater progress_;
 
+  /// Aggregate counters for the entire query. Lives in 'obj_pool_'. Set in Exec().
+  RuntimeProfile* query_profile_ = nullptr;
+
+  /// Total time spent in finalization (typically 0 except for INSERT into hdfs
+  /// tables). Set in Exec().
+  RuntimeProfile::Counter* finalization_timer_ = nullptr;
+
   /// Total number of filter updates received (always 0 if filter mode is not
   /// GLOBAL). Excludes repeated broadcast filter updates. Set in Exec().
   RuntimeProfile::Counter* filter_updates_received_ = nullptr;
@@ -256,6 +251,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
     void Init(const QuerySchedule& query_schedule);
   };
 
+  // Initialized by Exec().
   ExecSummary exec_summary_;
 
   /// Filled in as the query completes and tracks the results of DML queries.  This is
@@ -263,52 +259,40 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// coordinator fragment: only one of the two can legitimately produce updates.
   DmlExecState dml_exec_state_;
 
-  /// Aggregate counters for the entire query. Lives in 'obj_pool_'.
-  RuntimeProfile* query_profile_ = nullptr;
-
-  /// Protects all fields below. This is held while making RPCs, so this lock should
-  /// only be acquired if the acquiring thread is prepared to wait for a significant
-  /// time.
-  /// TODO: clarify to what extent the fields below need to be protected by lock_
-  /// Lock ordering is
-  /// 1. wait_lock_
-  /// 2. lock_
-  /// 3. BackendState::lock_
-  /// 4. filter_lock_
-  boost::mutex lock_;
-
-  /// Overall status of the entire query; set to the first reported fragment error
-  /// status or to CANCELLED, if Cancel() is called.
-  Status query_status_;
-
-  /// If true, the query is done returning all results.  It is possible that the
-  /// coordinator still needs to wait for cleanup on remote fragments (e.g. queries
-  /// with limit)
-  /// Once this is set to true, errors from execution backends are ignored.
-  bool returned_all_results_ = false;
-
-  /// If there is no coordinator fragment, Wait() simply waits until all
-  /// backends report completion by notifying on backend_completion_cv_.
-  /// Tied to lock_.
-  ConditionVariable backend_completion_cv_;
-
-  /// Count of the number of backends for which done != true. When this
-  /// hits 0, any Wait()'ing thread is notified
-  int num_remaining_backends_ = 0;
-
   /// Event timeline for this query. Not owned.
   RuntimeProfile::EventSequence* query_events_ = nullptr;
 
-  /// Indexed by fragment idx (TPlanFragment.idx). Filled in InitFragmentStats(),
-  /// elements live in obj_pool().
+  /// Indexed by fragment idx (TPlanFragment.idx). Filled in
+  /// Exec()/InitFragmentStats(), elements live in obj_pool(). Updated by BackendState
+  /// sequentially, without synchronization.
   std::vector<FragmentStats*> fragment_stats_;
 
-  /// total time spent in finalization (typically 0 except for INSERT into hdfs tables)
-  RuntimeProfile::Counter* finalization_timer_ = nullptr;
+  /// Barrier that is released when all calls to BackendState::Exec() have
+  /// returned. Initialized in StartBackendExec().
+  boost::scoped_ptr<CountingBarrier> exec_rpcs_complete_barrier_;
+
+  /// Barrier that is released when all backends have indicated execution completion,
+  /// or when all backends are cancelled due to an execution error or client requested
+  /// cancellation. Initialized in StartBackendExec().
+  boost::scoped_ptr<CountingBarrier> backend_exec_complete_barrier_;
+
+  SpinLock exec_state_lock_; // protects exec-state_ and exec_status_
 
-  /// Barrier that is released when all calls to ExecRemoteFragment() have
-  /// returned, successfully or not. Initialised during Exec().
-  boost::scoped_ptr<CountingBarrier> exec_complete_barrier_;
+  /// EXECUTING: in-flight; the only non-terminal state
+  /// RETURNED_RESULTS: GetNext() set eos to true, or for DML, the request is complete
+  /// CANCELLED: Cancel() was called (not: someone called CancelBackends())
+  /// ERROR: received an error from a backend
+  enum class ExecState {
+    EXECUTING, RETURNED_RESULTS, CANCELLED, ERROR
+  };
+  ExecState exec_state_ = ExecState::EXECUTING;
+
+  /// Overall execution status; only set on exec_state_ transitions:
+  /// - EXECUTING: OK
+  /// - RETURNED_RESULTS: OK
+  /// - CANCELLED: CANCELLED
+  /// - ERROR: error status
+  Status exec_status_;
 
   /// Protects filter_routing_table_.
   SpinLock filter_lock_;
@@ -321,12 +305,6 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// safe to concurrently read from filter_routing_table_.
   bool filter_routing_table_complete_ = false;
 
-  /// True if and only if ReleaseExecResources() has been called.
-  bool released_exec_resources_ = false;
-
-  /// True if and only if ReleaseAdmissionControlResources() has been called.
-  bool released_admission_control_resources_ = false;
-
   /// Returns a local object pool.
   ObjectPool* obj_pool() { return obj_pool_.get(); }
 
@@ -334,36 +312,67 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// HDFS INSERT finalization is not required.
   const TFinalizeParams* finalize_params() const;
 
-  const TQueryCtx& query_ctx() const { return schedule_.request().query_ctx; }
+  const TQueryCtx& query_ctx() const;
 
-  /// Only valid *after* calling Exec(). Return nullptr if the running query does not
-  /// produce any rows.
-  RuntimeState* runtime_state();
+  const TUniqueId& query_id() const;
 
   /// Returns a pretty-printed table of the current filter state.
   std::string FilterDebugString();
 
-  /// Cancels all fragment instances. Assumes that lock_ is held. This may be called when
-  /// the query is not being cancelled in the case where the query limit is reached.
-  void CancelInternal();
-
-  /// Acquires lock_ and updates query_status_ with 'status' if it's not already
-  /// an error status, and returns the current query_status_. The status may be
-  /// due to an error in a specific fragment instance, or it can be a general error
-  /// not tied to a specific fragment instance.
-  /// Calls CancelInternal() when switching to an error status.
-  /// When an error is due to a specific fragment instance, 'is_fragment_failure' must
-  /// be true and 'failed_fragment' is the fragment_id that has failed, used for error
-  /// reporting. For a general error not tied to a specific instance,
-  /// 'is_fragment_failure' must be false and 'failed_fragment' will be ignored.
-  /// 'backend_hostname' is used for error reporting in either case.
-  Status UpdateStatus(const Status& status, const std::string& backend_hostname,
-      bool is_fragment_failure, const TUniqueId& failed_fragment) WARN_UNUSED_RESULT;
-
-  /// Returns only when either all execution backends have reported success or the query
-  /// is in error. Returns the status of the query.
-  /// It is safe to call this concurrently, but any calls must be made only after Exec().
-  Status WaitForBackendCompletion() WARN_UNUSED_RESULT;
+  /// Called when the query is done executing due to reaching EOS or client
+  /// cancellation. If 'exec_state_' != EXECUTING, does nothing. Otherwise sets
+  /// 'exec_state_' to 'state' (must be either CANCELLED or RETURNED_RESULTS), and
+  /// finalizes execution (cancels remaining backends if transitioning to CANCELLED;
+  /// either way, calls ComputeQuerySummary() and releases resources). Returns the
+  /// resulting overall execution status.
+  Status SetNonErrorTerminalState(const ExecState state) WARN_UNUSED_RESULT;
+
+  /// Transitions 'exec_state_' given an execution status and returns the resulting
+  /// overall status:
+  ///
+  /// - if the 'status' parameter is ok, no state transition
+  /// - if 'exec_state_' is EXECUTING and 'status' is not ok, transitions to ERROR
+  /// - if 'exec_state_' is already RETURNED_RESULTS, CANCELLED, or ERROR: does not
+  ///   transition state (those are terminal states) however in the case of ERROR,
+  ///   status may be updated to a more interesting status.
+  ///
+  /// Should not be called for (client initiated) cancellation. Call
+  /// SetNonErrorTerminalState(CANCELLED) instead.
+  ///
+  /// 'failed_finstance' is the fragment instance id that has failed (or nullptr if the
+  /// failure is not specific to a fragment instance), used for error reporting along
+  /// with 'instance_hostname'.
+  Status UpdateExecState(const Status& status, const TUniqueId* failed_finstance,
+      const string& instance_hostname) WARN_UNUSED_RESULT;
+
+  /// Helper for SetNonErrorTerminalState() and UpdateExecStateIfError(). If the caller
+  /// transitioned to a terminal state (which happens exactly once for the lifetime of
+  /// the Coordinator object), then finalizes execution (cancels remaining backends if
+  /// transitioning to CANCELLED; in all cases releases resources and calls
+  /// ComputeQuerySummary()). Must not be called if exec RPCs are pending.
+  void HandleExecStateTransition(const ExecState old_state, const ExecState new_state);
+
+  /// Return true if 'exec_state_' is RETURNED_RESULTS.
+  /// TODO: remove with IMPALA-6984.
+  bool ReturnedAllResults() WARN_UNUSED_RESULT;
+
+  /// Return the string representation of 'state'.
+  static const char* ExecStateToString(const ExecState state);
+
+  // For DCHECK_EQ, etc of ExecState values.
+  friend std::ostream& operator<<(std::ostream& o, const ExecState s) {
+    return o << ExecStateToString(s);
+  }
+
+  /// Helper for HandleExecStateTransition(). Sends cancellation request to all
+  /// executing backends but does not wait for acknowledgement from the backends. The
+  /// ExecState state-machine ensures this is called at most once.
+  void CancelBackends();
+
+  /// Returns only when either all execution backends have reported success or a request
+  /// to cancel the backends has already been sent. It is safe to call this concurrently,
+  /// but any calls must be made only after Exec().
+  void WaitForBackends();
 
   /// Initializes fragment_stats_ and query_profile_. Must be called before
   /// InitBackendStates().
@@ -385,36 +394,33 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// finishing the INSERT in flight.
   Status FinalizeHdfsInsert() WARN_UNUSED_RESULT;
 
-  /// Populates backend_states_, starts query execution at all backends in parallel, and
-  /// blocks until startup completes.
+  /// Helper for Exec(). Populates backend_states_, starts query execution at all
+  /// backends in parallel, and blocks until startup completes.
   void StartBackendExec();
 
-  /// Calls CancelInternal() and returns an error if there was any error starting
-  /// backend execution.
-  /// Also updates query_profile_ with the startup latency histogram.
+  /// Helper for Exec(). Checks for errors encountered when starting backend execution,
+  /// using any non-OK status, if any, as the overall status. Returns the overall
+  /// status. Also updates query_profile_ with the startup latency histogram.
   Status FinishBackendStartup() WARN_UNUSED_RESULT;
 
   /// Build the filter routing table by iterating over all plan nodes and collecting the
   /// filters that they either produce or consume.
   void InitFilterRoutingTable();
 
-  /// Releases all resources associated with query execution. Acquires lock_. Idempotent.
+  /// Helper for HandleExecStateTransition(). Releases all resources associated with
+  /// query execution. The ExecState state-machine ensures this is called exactly once.
   void ReleaseExecResources();
 
-  /// Same as ReleaseExecResources() except the lock must be held by the caller.
-  void ReleaseExecResourcesLocked();
-
-  /// Releases admission control resources for use by other queries.
-  /// This should only be called if one of following preconditions is satisfied for each
-  /// backend on which the query is executing:
-  /// * The backend finished execution.
-  ///   Rationale: the backend isn't consuming resources.
-  //
+  /// Helper for HandleExecStateTransition(). Releases admission control resources for
+  /// use by other queries. This should only be called if one of following
+  /// preconditions is satisfied for each backend on which the query is executing:
+  ///
+  /// * The backend finished execution.  Rationale: the backend isn't consuming
+  ///   resources.
   /// * A cancellation RPC was delivered to the backend.
   ///   Rationale: the backend will be cancelled and release resources soon. By the
   ///   time a newly admitted query fragment starts up on the backend and starts consuming
   ///   resources, the resources from this query will probably have been released.
-  //
   /// * Sending the cancellation RPC to the backend failed
   ///   Rationale: the backend is either down or will tear itself down when it next tries
   ///   to send a status RPC to the coordinator. It's possible that the fragment will be
@@ -425,16 +431,13 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   ///   pessimistically queueing queries while we wait for a response from a backend that
   ///   may never come.
   ///
-  /// Calling WaitForBackendCompletion() or CancelInternal() before this function is
-  /// sufficient to satisfy the above preconditions. If the query has an expensive
-  /// finalization step post query execution (e.g. a DML statement), then this should
-  /// be called after that completes to avoid over-admitting queries.
+  /// Calling WaitForBackends() or CancelBackends() before this function is sufficient
+  /// to satisfy the above preconditions. If the query has an expensive finalization
+  /// step post query execution (e.g. a DML statement), then this should be called
+  /// after that completes to avoid over-admitting queries.
   ///
-  /// Acquires lock_. Idempotent.
+  /// The ExecState state-machine ensures this is called exactly once.
   void ReleaseAdmissionControlResources();
-
-  /// Same as ReleaseAdmissionControlResources() except lock must be held by caller.
-  void ReleaseAdmissionControlResourcesLocked();
 };
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/75d19c87/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 12b9b78..2ca1256 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -904,7 +904,7 @@ Status ClientRequestState::Cancel(bool check_inflight, const Status* cause) {
 
   // Cancel the parent query. 'lock_' should not be held because cancellation involves
   // RPCs and can block for a long time.
-  if (coord != NULL) coord->Cancel(cause);
+  if (coord != NULL) coord->Cancel();
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/75d19c87/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index fb3f261..3af4c9b 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -111,11 +111,6 @@ class ClientRequestState;
 /// 6. ClientRequestState::expiration_data_lock_
 /// 7. Coordinator::exec_summary_lock
 ///
-/// Coordinator::lock_ should not be acquired at the same time as the
-/// ImpalaServer/SessionState/ClientRequestState locks. Aside from
-/// Coordinator::exec_summary_lock_ the Coordinator's lock ordering is independent of
-/// the above lock ordering.
-///
 /// The following locks are not held in conjunction with other locks:
 /// * query_log_lock_
 /// * session_timeout_lock_

http://git-wip-us.apache.org/repos/asf/impala/blob/75d19c87/be/src/util/counting-barrier.h
----------------------------------------------------------------------
diff --git a/be/src/util/counting-barrier.h b/be/src/util/counting-barrier.h
index 49b0bde..827c526 100644
--- a/be/src/util/counting-barrier.h
+++ b/be/src/util/counting-barrier.h
@@ -33,8 +33,23 @@ class CountingBarrier {
   }
 
   /// Sends one notification, decrementing the number of pending notifications by one.
-  void Notify() {
-    if (count_.Add(-1) == 0) promise_.Set(true);
+  /// Returns the remaining pending notifications.
+  int32_t Notify() {
+    int32_t result = count_.Add(-1);
+    if (result == 0) promise_.Set(true);
+    return result;
+  }
+
+  /// Sets the number of pending notifications to 0 and unblocks Wait().
+  void NotifyRemaining() {
+    while (true) {
+      int32_t value = count_.Load();
+      if (value <= 0) return;  // count_ can legitimately drop below 0
+      if (count_.CompareAndSwap(value, 0)) {
+        promise_.Set(true);
+        return;
+      }
+    }
   }
 
   /// Blocks until all notifications are received.
@@ -44,6 +59,8 @@ class CountingBarrier {
   /// case '*timed_out' will be true.
   void Wait(int64_t timeout_ms, bool* timed_out) { promise_.Get(timeout_ms, timed_out); }
 
+  int32_t pending() const { return count_.Load(); }
+
  private:
   /// Used to signal waiters when all notifications are received.
   Promise<bool> promise_;