You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by sa...@apache.org on 2017/03/18 00:28:52 UTC

[3/4] incubator-impala git commit: IMPALA-4996: Single-threaded KuduScanNode

IMPALA-4996: Single-threaded KuduScanNode

This introduces KuduScanNodeMt, the single-threaded version
of KuduScanNode that materializes the tuples in GetNext().
KuduScanNodeMt is enabled by the same condition as
HdfsScanNodeMt: mt_dop is greater than or equal to 1.

To share code between the two implementations, KuduScanNode
and KuduScanNodeMt are now subclasses of KuduScanNodeBase,
which implements the shared code. The KuduScanner is
minimally impacted, as it already had the required GetNext
interface.

Since the KuduClient is a heavy-weight object, it is now
shared at the QueryState level. We try to share the
KuduClient as much as possible, but there are times when
the KuduClient cannot be shared. Each Kudu table has
master addresses stored in the Hive Metastore. We only
share KuduClients for tables that have an identical value
for the master addresses. In the ideal case, every Kudu
table will have the same value, but there is no explicit
guarantee of this.

The testing for this is a modified version of
kudu-scan-node.test run with various mt_dop values.

Change-Id: I6e4593300e376bc508b78acaea64ffdd2c73a67a
Reviewed-on: http://gerrit.cloudera.org:8080/6312
Reviewed-by: Marcel Kornacker <ma...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/5bb988b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/5bb988b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/5bb988b1

Branch: refs/heads/master
Commit: 5bb988b1c58a2377777312c8e4dd56cbd0dee8a2
Parents: 6dff906
Author: Joe McDonnell <jo...@cloudera.com>
Authored: Fri Mar 3 17:47:23 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Mar 17 19:33:31 2017 +0000

----------------------------------------------------------------------
 be/src/exec/CMakeLists.txt                      |   2 +
 be/src/exec/exec-node.cc                        |  11 +-
 be/src/exec/hdfs-scan-node-base.h               |   2 -
 be/src/exec/kudu-scan-node-base.cc              | 143 +++++++++++++++++++
 be/src/exec/kudu-scan-node-base.h               | 112 +++++++++++++++
 be/src/exec/kudu-scan-node-mt.cc                | 106 ++++++++++++++
 be/src/exec/kudu-scan-node-mt.h                 |  54 +++++++
 be/src/exec/kudu-scan-node.cc                   | 104 ++------------
 be/src/exec/kudu-scan-node.h                    |  46 +-----
 be/src/exec/kudu-scanner.cc                     |   4 +-
 be/src/exec/kudu-scanner.h                      |   6 +-
 be/src/runtime/query-state.cc                   |  25 ++++
 be/src/runtime/query-state.h                    |  25 ++++
 common/thrift/PlanNodes.thrift                  |   4 +
 .../org/apache/impala/planner/KuduScanNode.java |  12 ++
 .../queries/QueryTest/mt-dop-kudu.test          |  39 +++++
 tests/query_test/test_mt_dop.py                 |  11 ++
 17 files changed, 563 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5bb988b1/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 3d236a8..1193660 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -82,6 +82,8 @@ add_library(Exec
   plan-root-sink.cc
   kudu-scanner.cc
   kudu-scan-node.cc
+  kudu-scan-node-base.cc
+  kudu-scan-node-mt.cc
   kudu-table-sink.cc
   kudu-util.cc
   read-write-util.cc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5bb988b1/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index 518a0dd..02d4bf6 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -37,6 +37,7 @@
 #include "exec/hdfs-scan-node.h"
 #include "exec/hdfs-scan-node-mt.h"
 #include "exec/kudu-scan-node.h"
+#include "exec/kudu-scan-node-mt.h"
 #include "exec/kudu-util.h"
 #include "exec/nested-loop-join-node.h"
 #include "exec/partitioned-aggregation-node.h"
@@ -271,7 +272,6 @@ Status ExecNode::CreateNode(ObjectPool* pool, const TPlanNode& tnode,
   stringstream error_msg;
   switch (tnode.node_type) {
     case TPlanNodeType::HDFS_SCAN_NODE:
-      *node = pool->Add(new HdfsScanNode(pool, tnode, descs));
       if (tnode.hdfs_scan_node.use_mt_scan_node) {
         DCHECK_GT(state->query_options().mt_dop, 0);
         *node = pool->Add(new HdfsScanNodeMt(pool, tnode, descs));
@@ -289,7 +289,14 @@ Status ExecNode::CreateNode(ObjectPool* pool, const TPlanNode& tnode,
       break;
     case TPlanNodeType::KUDU_SCAN_NODE:
       RETURN_IF_ERROR(CheckKuduAvailability());
-      *node = pool->Add(new KuduScanNode(pool, tnode, descs));
+      if (tnode.kudu_scan_node.use_mt_scan_node) {
+        DCHECK_GT(state->query_options().mt_dop, 0);
+        *node = pool->Add(new KuduScanNodeMt(pool, tnode, descs));
+      } else {
+        DCHECK(state->query_options().mt_dop == 0
+            || state->query_options().num_scanner_threads == 1);
+        *node = pool->Add(new KuduScanNode(pool, tnode, descs));
+      }
       break;
     case TPlanNodeType::AGGREGATION_NODE:
       if (FLAGS_enable_partitioned_aggregation) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5bb988b1/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 63d05be..cde9574 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -128,8 +128,6 @@ class HdfsScanNodeBase : public ScanNode {
   /// to a queue, false otherwise.
   virtual bool HasRowBatchQueue() const = 0;
 
-  int limit() const { return limit_; }
-
   const std::vector<SlotDescriptor*>& materialized_slots()
       const { return materialized_slots_; }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5bb988b1/be/src/exec/kudu-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-base.cc b/be/src/exec/kudu-scan-node-base.cc
new file mode 100644
index 0000000..fd09d6d
--- /dev/null
+++ b/be/src/exec/kudu-scan-node-base.cc
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/kudu-scan-node-base.h"
+
+#include <boost/algorithm/string.hpp>
+#include <kudu/client/row_result.h>
+#include <kudu/client/schema.h>
+#include <kudu/client/value.h>
+#include <thrift/protocol/TDebugProtocol.h>
+#include <vector>
+
+#include "exec/kudu-scanner.h"
+#include "exec/kudu-util.h"
+#include "exprs/expr.h"
+#include "runtime/mem-pool.h"
+#include "runtime/query-state.h"
+#include "runtime/runtime-state.h"
+#include "runtime/row-batch.h"
+#include "runtime/string-value.h"
+#include "runtime/tuple-row.h"
+#include "util/periodic-counter-updater.h"
+#include "util/runtime-profile-counters.h"
+
+#include "common/names.h"
+
+using kudu::client::KuduClient;
+using kudu::client::KuduTable;
+
+namespace impala {
+
+const string KuduScanNodeBase::KUDU_ROUND_TRIPS = "TotalKuduScanRoundTrips";
+const string KuduScanNodeBase::KUDU_REMOTE_TOKENS = "KuduRemoteScanTokens";
+
+KuduScanNodeBase::KuduScanNodeBase(ObjectPool* pool, const TPlanNode& tnode,
+    const DescriptorTbl& descs)
+    : ScanNode(pool, tnode, descs),
+      tuple_id_(tnode.kudu_scan_node.tuple_id),
+      client_(nullptr),
+      counters_running_(false),
+      next_scan_token_idx_(0) {
+  DCHECK(KuduIsAvailable());
+}
+
+KuduScanNodeBase::~KuduScanNodeBase() {
+  DCHECK(is_closed());
+}
+
+Status KuduScanNodeBase::Prepare(RuntimeState* state) {
+  RETURN_IF_ERROR(ScanNode::Prepare(state));
+  runtime_state_ = state;
+
+  scan_ranges_complete_counter_ =
+      ADD_COUNTER(runtime_profile(), SCAN_RANGES_COMPLETE_COUNTER, TUnit::UNIT);
+  kudu_round_trips_ = ADD_COUNTER(runtime_profile(), KUDU_ROUND_TRIPS, TUnit::UNIT);
+  kudu_remote_tokens_ = ADD_COUNTER(runtime_profile(), KUDU_REMOTE_TOKENS, TUnit::UNIT);
+  counters_running_ = true;
+
+  DCHECK(state->desc_tbl().GetTupleDescriptor(tuple_id_) != NULL);
+  tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_);
+
+  // Initialize the list of scan tokens to process from the TScanRangeParams.
+  DCHECK(scan_range_params_ != NULL);
+  int num_remote_tokens = 0;
+  for (const TScanRangeParams& params: *scan_range_params_) {
+    if (params.__isset.is_remote && params.is_remote) ++num_remote_tokens;
+    scan_tokens_.push_back(params.scan_range.kudu_scan_token);
+  }
+  COUNTER_SET(kudu_remote_tokens_, num_remote_tokens);
+
+  return Status::OK();
+}
+
+Status KuduScanNodeBase::Open(RuntimeState* state) {
+  RETURN_IF_ERROR(ExecNode::Open(state));
+  RETURN_IF_CANCELLED(state);
+  RETURN_IF_ERROR(QueryMaintenance(state));
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+
+  const KuduTableDescriptor* table_desc =
+      static_cast<const KuduTableDescriptor*>(tuple_desc_->table_desc());
+
+  RETURN_IF_ERROR(runtime_state_->query_state()->GetKuduClient(
+      table_desc->kudu_master_addresses(), &client_));
+
+  uint64_t latest_ts = static_cast<uint64_t>(
+      max<int64_t>(0, state->query_ctx().session.kudu_latest_observed_ts));
+  VLOG_RPC << "Latest observed Kudu timestamp: " << latest_ts;
+  if (latest_ts > 0) client_->SetLatestObservedTimestamp(latest_ts);
+
+  KUDU_RETURN_IF_ERROR(client_->OpenTable(table_desc->table_name(), &table_),
+      "Unable to open Kudu table");
+  return Status::OK();
+}
+
+void KuduScanNodeBase::Close(RuntimeState* state) {
+  if (is_closed()) return;
+  StopAndFinalizeCounters();
+  ExecNode::Close(state);
+}
+
+void KuduScanNodeBase::DebugString(int indentation_level, stringstream* out) const {
+  string indent(indentation_level * 2, ' ');
+  *out << indent << "KuduScanNode(tupleid=" << tuple_id_ << ")";
+}
+
+bool KuduScanNodeBase::HasScanToken() {
+  return (next_scan_token_idx_ < scan_tokens_.size());
+}
+
+const string* KuduScanNodeBase::GetNextScanToken() {
+  if (!HasScanToken()) return nullptr;
+  const string* token = &scan_tokens_[next_scan_token_idx_++];
+  return token;
+}
+
+void KuduScanNodeBase::StopAndFinalizeCounters() {
+  if (!counters_running_) return;
+  counters_running_ = false;
+
+  PeriodicCounterUpdater::StopRateCounter(total_throughput_counter());
+  PeriodicCounterUpdater::StopTimeSeriesCounter(bytes_read_timeseries_counter_);
+}
+
+Status KuduScanNodeBase::GetConjunctCtxs(vector<ExprContext*>* ctxs) {
+  return Expr::CloneIfNotExists(conjunct_ctxs_, runtime_state_, ctxs);
+}
+
+}  // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5bb988b1/be/src/exec/kudu-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-base.h b/be/src/exec/kudu-scan-node-base.h
new file mode 100644
index 0000000..70e5b94
--- /dev/null
+++ b/be/src/exec/kudu-scan-node-base.h
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXEC_KUDU_SCAN_NODE_BASE_H_
+#define IMPALA_EXEC_KUDU_SCAN_NODE_BASE_H_
+
+#include <gtest/gtest.h>
+#include <kudu/client/client.h>
+
+#include "exec/scan-node.h"
+#include "runtime/descriptors.h"
+
+namespace impala {
+
+class KuduScanner;
+
+/// Base class for the two Kudu scan node implementations. Contains the code that is
+/// independent of whether the rows are materialized by scanner threads (KuduScanNode)
+/// or by the thread calling GetNext (KuduScanNodeMt). This class is not thread safe
+/// for concurrent access. Subclasses are responsible for implementing thread safety.
+/// TODO: This class can be removed when the old single threaded implementation is
+/// removed.
+class KuduScanNodeBase : public ScanNode {
+ public:
+  KuduScanNodeBase(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+  ~KuduScanNodeBase();
+
+  virtual Status Prepare(RuntimeState* state);
+  virtual Status Open(RuntimeState* state);
+  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) = 0;
+  virtual void Close(RuntimeState* state);
+
+ protected:
+  virtual void DebugString(int indentation_level, std::stringstream* out) const;
+
+  /// Returns the total number of scan tokens
+  int NumScanTokens() { return scan_tokens_.size(); }
+
+  /// Returns whether there are any scan tokens remaining. Not thread safe.
+  bool HasScanToken();
+
+  /// Returns the next scan token. Returns NULL if there are no more scan tokens.
+  /// Not thread safe, access must be synchronized.
+  const std::string* GetNextScanToken();
+
+  RuntimeState* runtime_state_;
+
+  /// Stops periodic counters and aggregates counter values for the entire scan node.
+  /// This should be called as soon as the scan node is complete to get the most accurate
+  /// counter values.
+  /// This can be called multiple times, subsequent calls will be ignored.
+  /// This must be called on Close() to unregister counters.
+  /// Scan nodes with a RowBatch queue may have to synchronize calls to this function.
+  void StopAndFinalizeCounters();
+
+ private:
+  friend class KuduScanner;
+
+  /// Tuple id resolved in Prepare() to set tuple_desc_.
+  const TupleId tuple_id_;
+
+  /// Descriptor of tuples read from Kudu table.
+  const TupleDescriptor* tuple_desc_;
+
+  /// Pointer to the KuduClient, which is stored on the QueryState and shared between
+  /// scanners and fragment instances.
+  kudu::client::KuduClient* client_;
+
+  /// Kudu table reference. Shared between scanner threads for KuduScanNode.
+  kudu::client::sp::shared_ptr<kudu::client::KuduTable> table_;
+
+  /// If true, counters are actively running and need to be reported in the runtime
+  /// profile.
+  bool counters_running_;
+
+  /// Set of scan tokens to be deserialized into Kudu scanners.
+  std::vector<std::string> scan_tokens_;
+
+  /// The next index in 'scan_tokens_' to be assigned.
+  int next_scan_token_idx_;
+
+  RuntimeProfile::Counter* kudu_round_trips_;
+  RuntimeProfile::Counter* kudu_remote_tokens_;
+  static const std::string KUDU_ROUND_TRIPS;
+  static const std::string KUDU_REMOTE_TOKENS;
+
+  /// Returns a cloned copy of the scan node's conjuncts. Requires that the expressions
+  /// have been open previously.
+  Status GetConjunctCtxs(vector<ExprContext*>* ctxs);
+
+  const TupleDescriptor* tuple_desc() const { return tuple_desc_; }
+  kudu::client::KuduClient* kudu_client() { return client_; }
+  RuntimeProfile::Counter* kudu_round_trips() const { return kudu_round_trips_; }
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5bb988b1/be/src/exec/kudu-scan-node-mt.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-mt.cc b/be/src/exec/kudu-scan-node-mt.cc
new file mode 100644
index 0000000..8723daa
--- /dev/null
+++ b/be/src/exec/kudu-scan-node-mt.cc
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/kudu-scan-node-mt.h"
+
+#include <thrift/protocol/TDebugProtocol.h>
+#include <vector>
+
+#include "exec/kudu-scanner.h"
+#include "exec/kudu-util.h"
+
+#include "runtime/runtime-state.h"
+#include "runtime/row-batch.h"
+#include "runtime/tuple-row.h"
+#include "util/runtime-profile-counters.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+KuduScanNodeMt::KuduScanNodeMt(ObjectPool* pool, const TPlanNode& tnode,
+    const DescriptorTbl& descs)
+    : KuduScanNodeBase(pool, tnode, descs),
+      scan_token_(nullptr) {
+  DCHECK(KuduIsAvailable());
+}
+
+KuduScanNodeMt::~KuduScanNodeMt() {
+  DCHECK(is_closed());
+}
+
+Status KuduScanNodeMt::Open(RuntimeState* state) {
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+  RETURN_IF_ERROR(KuduScanNodeBase::Open(state));
+  scanner_.reset(new KuduScanner(this, runtime_state_));
+  RETURN_IF_ERROR(scanner_->Open());
+  return Status::OK();
+}
+
+Status KuduScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+  DCHECK(row_batch != NULL);
+  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
+  RETURN_IF_CANCELLED(state);
+  RETURN_IF_ERROR(QueryMaintenance(state));
+  *eos = false;
+
+  if (scan_token_ == nullptr) {
+    scan_token_ = GetNextScanToken();
+    if (scan_token_ == nullptr) {
+      StopAndFinalizeCounters();
+      scanner_->Close();
+      scanner_.reset();
+      *eos = true;
+      return Status::OK();
+    }
+    RETURN_IF_ERROR(scanner_->OpenNextScanToken(*scan_token_));
+  }
+
+  bool scanner_eos = false;
+  RETURN_IF_ERROR(scanner_->GetNext(row_batch, &scanner_eos));
+  if (scanner_eos) {
+    scan_ranges_complete_counter()->Add(1);
+    scan_token_ = nullptr;
+  }
+  scanner_->KeepKuduScannerAlive();
+
+  num_rows_returned_ += row_batch->num_rows();
+  if (ReachedLimit()) {
+    int num_rows_over = num_rows_returned_ - limit_;
+    row_batch->set_num_rows(row_batch->num_rows() - num_rows_over);
+    num_rows_returned_ -= num_rows_over;
+    scan_token_ = nullptr;
+    StopAndFinalizeCounters();
+    scanner_->Close();
+    scanner_.reset();
+    *eos = true;
+  }
+  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+
+  return Status::OK();
+}
+
+void KuduScanNodeMt::Close(RuntimeState* state) {
+  if (is_closed()) return;
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+  if (scanner_.get() != nullptr) scanner_->Close();
+  scanner_.reset();
+  KuduScanNodeBase::Close(state);
+}
+
+}  // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5bb988b1/be/src/exec/kudu-scan-node-mt.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-mt.h b/be/src/exec/kudu-scan-node-mt.h
new file mode 100644
index 0000000..4812e46
--- /dev/null
+++ b/be/src/exec/kudu-scan-node-mt.h
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXEC_KUDU_SCAN_NODE_MT_H_
+#define IMPALA_EXEC_KUDU_SCAN_NODE_MT_H_
+
+#include "exec/kudu-scan-node-base.h"
+
+namespace impala {
+
+class KuduScanner;
+
+/// A scan node that scans a Kudu table.
+///
+/// This takes a set of serialized Kudu scan tokens which encode the information needed
+/// for this scan. This materializes the tuples, evaluates conjuncts and runtimes filters
+/// in the thread calling GetNext().
+class KuduScanNodeMt : public KuduScanNodeBase {
+ public:
+  KuduScanNodeMt(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+
+  ~KuduScanNodeMt();
+
+  virtual Status Open(RuntimeState* state);
+  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
+  virtual void Close(RuntimeState* state);
+
+ private:
+  /// Current scan token and corresponding scanner.
+  /// TODO: The KuduScanner should be merged into this class when we get rid of the
+  /// non-MT version of this class. It is not necessary for the scan node to
+  /// be separate from the scanner in the single threaded model. This would remove a
+  /// layer of indirection and clean up the code.
+  const std::string* scan_token_;
+  std::unique_ptr<KuduScanner> scanner_;
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5bb988b1/be/src/exec/kudu-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index a238e3c..c345748 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -17,27 +17,16 @@
 
 #include "exec/kudu-scan-node.h"
 
-#include <boost/algorithm/string.hpp>
-#include <kudu/client/row_result.h>
-#include <kudu/client/schema.h>
-#include <kudu/client/value.h>
 #include <thrift/protocol/TDebugProtocol.h>
-#include <vector>
 
 #include "exec/kudu-scanner.h"
 #include "exec/kudu-util.h"
-#include "exprs/expr.h"
 #include "gutil/gscoped_ptr.h"
-#include "gutil/strings/substitute.h"
-#include "gutil/stl_util.h"
 #include "runtime/mem-pool.h"
 #include "runtime/runtime-state.h"
 #include "runtime/row-batch.h"
-#include "runtime/string-value.h"
 #include "runtime/tuple-row.h"
 #include "util/disk-info.h"
-#include "util/jni-util.h"
-#include "util/periodic-counter-updater.h"
 #include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
@@ -45,26 +34,11 @@
 DEFINE_int32(kudu_max_row_batches, 0, "The maximum size of the row batch queue, "
     " for Kudu scanners.");
 
-using boost::algorithm::to_lower_copy;
-using kudu::client::KuduClient;
-using kudu::client::KuduColumnSchema;
-using kudu::client::KuduPredicate;
-using kudu::client::KuduRowResult;
-using kudu::client::KuduSchema;
-using kudu::client::KuduTable;
-using kudu::client::KuduValue;
-using kudu::Slice;
-
 namespace impala {
 
-const string KuduScanNode::KUDU_ROUND_TRIPS = "TotalKuduScanRoundTrips";
-const string KuduScanNode::KUDU_REMOTE_TOKENS = "KuduRemoteScanTokens";
-
 KuduScanNode::KuduScanNode(ObjectPool* pool, const TPlanNode& tnode,
     const DescriptorTbl& descs)
-    : ScanNode(pool, tnode, descs),
-      tuple_id_(tnode.kudu_scan_node.tuple_id),
-      next_scan_token_idx_(0),
+    : KuduScanNodeBase(pool, tnode, descs),
       num_active_scanners_(0),
       done_(false),
       thread_avail_cb_id_(-1) {
@@ -85,48 +59,9 @@ KuduScanNode::~KuduScanNode() {
   DCHECK(is_closed());
 }
 
-Status KuduScanNode::Prepare(RuntimeState* state) {
-  RETURN_IF_ERROR(ScanNode::Prepare(state));
-  runtime_state_ = state;
-
-  scan_ranges_complete_counter_ =
-      ADD_COUNTER(runtime_profile(), SCAN_RANGES_COMPLETE_COUNTER, TUnit::UNIT);
-  kudu_round_trips_ = ADD_COUNTER(runtime_profile(), KUDU_ROUND_TRIPS, TUnit::UNIT);
-  kudu_remote_tokens_ = ADD_COUNTER(runtime_profile(), KUDU_REMOTE_TOKENS, TUnit::UNIT);
-
-  DCHECK(state->desc_tbl().GetTupleDescriptor(tuple_id_) != NULL);
-
-  tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_);
-
-  // Initialize the list of scan tokens to process from the TScanRangeParams.
-  DCHECK(scan_range_params_ != NULL);
-  int num_remote_tokens = 0;
-  for (const TScanRangeParams& params: *scan_range_params_) {
-    if (params.__isset.is_remote && params.is_remote) ++num_remote_tokens;
-    scan_tokens_.push_back(params.scan_range.kudu_scan_token);
-  }
-  COUNTER_SET(kudu_remote_tokens_, num_remote_tokens);
-  return Status::OK();
-}
-
 Status KuduScanNode::Open(RuntimeState* state) {
-  RETURN_IF_ERROR(ExecNode::Open(state));
-  RETURN_IF_CANCELLED(state);
-  RETURN_IF_ERROR(QueryMaintenance(state));
   SCOPED_TIMER(runtime_profile_->total_time_counter());
-
-  const KuduTableDescriptor* table_desc =
-      static_cast<const KuduTableDescriptor*>(tuple_desc_->table_desc());
-
-  RETURN_IF_ERROR(CreateKuduClient(table_desc->kudu_master_addresses(), &client_));
-
-  uint64_t latest_ts = static_cast<uint64_t>(
-      max<int64_t>(0, state->query_ctx().session.kudu_latest_observed_ts));
-  VLOG_RPC << "Latest observed Kudu timestamp: " << latest_ts;
-  if (latest_ts > 0) client_->SetLatestObservedTimestamp(latest_ts);
-
-  KUDU_RETURN_IF_ERROR(client_->OpenTable(table_desc->table_name(), &table_),
-      "Unable to open Kudu table");
+  RETURN_IF_ERROR(KuduScanNodeBase::Open(state));
 
   num_scanner_threads_started_counter_ =
       ADD_COUNTER(runtime_profile(), NUM_SCANNER_THREADS_STARTED, TUnit::UNIT);
@@ -152,7 +87,9 @@ Status KuduScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   SCOPED_TIMER(materialize_tuple_timer());
 
-  if (ReachedLimit() || scan_tokens_.empty()) {
+  // If there are no scan tokens, nothing is ever placed in the materialized
+  // row batch, so exit early for this case.
+  if (ReachedLimit() || NumScanTokens() == 0) {
     *eos = true;
     return Status::OK();
   }
@@ -187,8 +124,6 @@ Status KuduScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos
 void KuduScanNode::Close(RuntimeState* state) {
   if (is_closed()) return;
   SCOPED_TIMER(runtime_profile_->total_time_counter());
-  PeriodicCounterUpdater::StopRateCounter(total_throughput_counter());
-  PeriodicCounterUpdater::StopTimeSeriesCounter(bytes_read_timeseries_counter_);
   if (thread_avail_cb_id_ != -1) {
     state->resource_pool()->RemoveThreadAvailableCb(thread_avail_cb_id_);
   }
@@ -202,30 +137,14 @@ void KuduScanNode::Close(RuntimeState* state) {
   scanner_threads_.JoinAll();
   DCHECK_EQ(num_active_scanners_, 0);
   materialized_row_batches_->Cleanup();
-  ExecNode::Close(state);
-}
-
-void KuduScanNode::DebugString(int indentation_level, stringstream* out) const {
-  string indent(indentation_level * 2, ' ');
-  *out << indent << "KuduScanNode(tupleid=" << tuple_id_ << ")";
-}
-
-const string* KuduScanNode::GetNextScanToken() {
-  unique_lock<mutex> lock(lock_);
-  if (done_ || next_scan_token_idx_ >= scan_tokens_.size()) return nullptr;
-  const string* token = &scan_tokens_[next_scan_token_idx_++];
-  return token;
-}
-
-Status KuduScanNode::GetConjunctCtxs(vector<ExprContext*>* ctxs) {
-  return Expr::CloneIfNotExists(conjunct_ctxs_, runtime_state_, ctxs);
+  KuduScanNodeBase::Close(state);
 }
 
 void KuduScanNode::ThreadAvailableCb(ThreadResourceMgr::ResourcePool* pool) {
   while (true) {
     unique_lock<mutex> lock(lock_);
     // All done or all tokens are assigned.
-    if (done_ || next_scan_token_idx_ >= scan_tokens_.size()) break;
+    if (done_ || !HasScanToken()) break;
 
     // Check if we can get a token.
     if (!pool->TryAcquireThreadToken()) break;
@@ -234,7 +153,7 @@ void KuduScanNode::ThreadAvailableCb(ThreadResourceMgr::ResourcePool* pool) {
     COUNTER_ADD(num_scanner_threads_started_counter_, 1);
 
     // Reserve the first token so no other thread picks it up.
-    const string* token = &scan_tokens_[next_scan_token_idx_++];
+    const string* token = GetNextScanToken();
     string name = Substitute("scanner-thread($0)",
         num_scanner_threads_started_counter_->value());
 
@@ -292,7 +211,12 @@ void KuduScanNode::RunScannerThread(const string& name, const string* initial_to
           break;
         }
       }
-      scan_token = GetNextScanToken();
+      unique_lock<mutex> l(lock_);
+      if (!done_) {
+        scan_token = GetNextScanToken();
+      } else {
+        scan_token = nullptr;
+      }
     }
   }
   scanner.Close();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5bb988b1/be/src/exec/kudu-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.h b/be/src/exec/kudu-scan-node.h
index 3b896c8..6341cb6 100644
--- a/be/src/exec/kudu-scan-node.h
+++ b/be/src/exec/kudu-scan-node.h
@@ -22,8 +22,7 @@
 #include <gtest/gtest.h>
 #include <kudu/client/client.h>
 
-#include "exec/scan-node.h"
-#include "runtime/descriptors.h"
+#include "exec/kudu-scan-node-base.h"
 #include "runtime/thread-resource-mgr.h"
 #include "gutil/gscoped_ptr.h"
 #include "util/thread.h"
@@ -37,43 +36,19 @@ class KuduScanner;
 /// This takes a set of serialized Kudu scan tokens which encode the information needed
 /// for this scan. A Kudu client deserializes the tokens into kudu scanners, and those
 /// are used to retrieve the rows for this scan.
-class KuduScanNode : public ScanNode {
+class KuduScanNode : public KuduScanNodeBase {
  public:
   KuduScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
 
   ~KuduScanNode();
 
-  virtual Status Prepare(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
   virtual void Close(RuntimeState* state);
 
- protected:
-  virtual void DebugString(int indentation_level, std::stringstream* out) const;
-
  private:
   friend class KuduScanner;
 
-  kudu::client::KuduClient* kudu_client() { return client_.get(); }
-
-  /// Tuple id resolved in Prepare() to set tuple_desc_.
-  const TupleId tuple_id_;
-
-  RuntimeState* runtime_state_;
-
-  /// Descriptor of tuples read from Kudu table.
-  const TupleDescriptor* tuple_desc_;
-
-  /// The Kudu client and table. Scanners share these instances.
-  kudu::client::sp::shared_ptr<kudu::client::KuduClient> client_;
-  kudu::client::sp::shared_ptr<kudu::client::KuduTable> table_;
-
-  /// Set of scan tokens to be deserialized into Kudu scanners.
-  std::vector<std::string> scan_tokens_;
-
-  /// The next index in 'scan_tokens_' to be assigned. Protected by lock_.
-  int next_scan_token_idx_;
-
   // Outgoing row batches queue. Row batches are produced asynchronously by the scanner
   // threads and consumed by the main thread.
   boost::scoped_ptr<RowBatchQueue> materialized_row_batches_;
@@ -102,11 +77,6 @@ class KuduScanNode : public ScanNode {
   /// Thread group for all scanner worker threads
   ThreadGroup scanner_threads_;
 
-  RuntimeProfile::Counter* kudu_round_trips_;
-  RuntimeProfile::Counter* kudu_remote_tokens_;
-  static const std::string KUDU_ROUND_TRIPS;
-  static const std::string KUDU_REMOTE_TOKENS;
-
   /// The id of the callback added to the thread resource manager when a thread
   /// is available. Used to remove the callback before this scan node is destroyed.
   /// -1 if no callback is registered.
@@ -126,18 +96,6 @@ class KuduScanNode : public ScanNode {
   /// in 'materialized_row_batches_' until the scanner reports eos, an error occurs, or
   /// the limit is reached.
   Status ProcessScanToken(KuduScanner* scanner, const std::string& scan_token);
-
-  /// Returns the next scan token. Thread safe. Returns NULL if there are no more scan
-  /// tokens.
-  const std::string* GetNextScanToken();
-
-  const TupleDescriptor* tuple_desc() const { return tuple_desc_; }
-
-  // Returns a cloned copy of the scan node's conjuncts. Requires that the expressions
-  // have been open previously.
-  Status GetConjunctCtxs(vector<ExprContext*>* ctxs);
-
-  RuntimeProfile::Counter* kudu_round_trips() const { return kudu_round_trips_; }
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5bb988b1/be/src/exec/kudu-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc
index d251eba..2495a58 100644
--- a/be/src/exec/kudu-scanner.cc
+++ b/be/src/exec/kudu-scanner.cc
@@ -58,7 +58,7 @@ namespace impala {
 
 const string MODE_READ_AT_SNAPSHOT = "READ_AT_SNAPSHOT";
 
-KuduScanner::KuduScanner(KuduScanNode* scan_node, RuntimeState* state)
+KuduScanner::KuduScanner(KuduScanNodeBase* scan_node, RuntimeState* state)
   : scan_node_(scan_node),
     state_(state),
     cur_kudu_batch_num_read_(0),
@@ -170,7 +170,7 @@ Status KuduScanner::HandleEmptyProjection(RowBatch* row_batch) {
 
 Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch, Tuple** tuple_mem) {
   // Short-circuit the count(*) case.
-  if (scan_node_->tuple_desc_->slots().empty()) {
+  if (scan_node_->tuple_desc()->slots().empty()) {
     return HandleEmptyProjection(row_batch);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5bb988b1/be/src/exec/kudu-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scanner.h b/be/src/exec/kudu-scanner.h
index 8c8c663..90ff68d 100644
--- a/be/src/exec/kudu-scanner.h
+++ b/be/src/exec/kudu-scanner.h
@@ -21,7 +21,7 @@
 #include <boost/scoped_ptr.hpp>
 #include <kudu/client/client.h>
 
-#include "exec/kudu-scan-node.h"
+#include "exec/kudu-scan-node-base.h"
 #include "runtime/descriptors.h"
 
 namespace impala {
@@ -36,7 +36,7 @@ class Tuple;
 /// by GetNext() until it reaches eos, and the caller may open another scan token.
 class KuduScanner {
  public:
-  KuduScanner(KuduScanNode* scan_node, RuntimeState* state);
+  KuduScanner(KuduScanNodeBase* scan_node, RuntimeState* state);
 
   /// Prepares this scanner for execution.
   /// Does not actually open a kudu::client::KuduScanner.
@@ -83,7 +83,7 @@ class KuduScanner {
     return reinterpret_cast<Tuple*>(mem + scan_node_->tuple_desc()->byte_size());
   }
 
-  KuduScanNode* scan_node_;
+  KuduScanNodeBase* scan_node_;
   RuntimeState* state_;
 
   /// The kudu::client::KuduScanner for the current scan token. A new KuduScanner is

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5bb988b1/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index b931808..2cc4818 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -19,7 +19,9 @@
 
 #include <boost/thread/lock_guard.hpp>
 #include <boost/thread/locks.hpp>
+#include <kudu/client/client.h>
 
+#include "exec/kudu-util.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/exec-env.h"
@@ -30,8 +32,13 @@
 
 #include "common/names.h"
 
+using boost::algorithm::join;
 using namespace impala;
 
+struct QueryState::KuduClientPtr {
+  kudu::client::sp::shared_ptr<kudu::client::KuduClient> kudu_client;
+};
+
 QueryState::ScopedRef::ScopedRef(const TUniqueId& query_id) {
   DCHECK(ExecEnv::GetInstance()->query_exec_mgr() != nullptr);
   query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->GetQueryState(query_id);
@@ -166,3 +173,21 @@ FragmentInstanceState* QueryState::GetFInstanceState(const TUniqueId& instance_i
   auto it = fis_map_.find(instance_id);
   return it != fis_map_.end() ? it->second : nullptr;
 }
+
+Status QueryState::GetKuduClient(const std::vector<std::string>& master_addresses,
+                                 kudu::client::KuduClient** client) {
+  std::string master_addr_concat = join(master_addresses, ",");
+  lock_guard<SpinLock> l(kudu_client_map_lock_);
+  auto kudu_client_map_it = kudu_client_map_.find(master_addr_concat);
+  if (kudu_client_map_it == kudu_client_map_.end()) {
+    // KuduClient doesn't exist, create it
+    KuduClientPtr* kudu_client_ptr = new KuduClientPtr;
+    RETURN_IF_ERROR(CreateKuduClient(master_addresses, &kudu_client_ptr->kudu_client));
+    kudu_client_map_[master_addr_concat].reset(kudu_client_ptr);
+    *client = kudu_client_ptr->kudu_client.get();
+  } else {
+    // Return existing KuduClient
+    *client = kudu_client_map_it->second->kudu_client.get();
+  }
+  return Status::OK();
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5bb988b1/be/src/runtime/query-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 650e8bf..d7ce10f 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -30,6 +30,8 @@
 #include "util/spinlock.h"
 #include "util/uid-util.h"
 
+namespace kudu { namespace client { class KuduClient; } }
+
 namespace impala {
 
 class FragmentInstanceState;
@@ -115,6 +117,13 @@ class QueryState {
   /// Must be called before destroying the QueryState.
   void ReleaseResources();
 
+  /// Gets a KuduClient for this list of master addresses. It will lookup and share
+  /// an existing KuduClient if possible. Otherwise, it will create a new KuduClient
+  /// internally and return a pointer to it. All KuduClients accessed through this
+  /// interface are owned by the QueryState. Thread safe.
+  Status GetKuduClient(const std::vector<std::string>& master_addrs,
+                       kudu::client::KuduClient** client);
+
   ~QueryState();
 
  private:
@@ -159,6 +168,22 @@ class QueryState {
   /// TODO: this will always be non-null once IMPALA-3200 is done
   TmpFileMgr::FileGroup* file_group_;
 
+  SpinLock kudu_client_map_lock_; // protects kudu_client_map_
+
+  /// Opaque type for storing the pointer to the KuduClient. This allows us
+  /// to avoid including Kudu header files.
+  struct KuduClientPtr;
+
+  /// Map from the master addresses string for a Kudu table to the KuduClientPtr for
+  /// accessing that table. The master address string is constructed by joining
+  /// the master address list entries with a comma separator.
+  typedef std::unordered_map<std::string, std::unique_ptr<KuduClientPtr>> KuduClientMap;
+
+  /// Map for sharing KuduClients between fragment instances. Each Kudu table has
+  /// a list of master addresses stored in the Hive Metastore. This map requires
+  /// that the master address lists be identical in order to share a KuduClient.
+  KuduClientMap kudu_client_map_;
+
   /// Create QueryState w/ copy of query_ctx and refcnt of 0.
   /// The query is associated with the resource pool named 'pool'
   QueryState(const TQueryCtx& query_ctx, const std::string& pool);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5bb988b1/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 25e693c..19a654d 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -254,6 +254,10 @@ struct THBaseScanNode {
 
 struct TKuduScanNode {
   1: required Types.TTupleId tuple_id
+
+  // Indicates whether the MT scan node implementation should be used.
+  // If this is true, then the MT_DOP query option must be > 0.
+  2: optional bool use_mt_scan_node
 }
 
 struct TEqJoinCondition {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5bb988b1/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index 8e96bcd..7cd6e7c 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -84,6 +84,9 @@ public class KuduScanNode extends ScanNode {
 
   private final KuduTable kuduTable_;
 
+  // True if this scan node should use the MT implementation in the backend.
+  private boolean useMtScanNode_;
+
   // Indexes for the set of hosts that will be used for the query.
   // From analyzer.getHostIndex().getIndex(address)
   private final Set<Integer> hostIndexSet_ = Sets.newHashSet();
@@ -135,6 +138,14 @@ public class KuduScanNode extends ScanNode {
       throw new ImpalaRuntimeException("Unable to initialize the Kudu scan node", e);
     }
 
+    // Determine backend scan node implementation to use.
+    if (analyzer.getQueryOptions().isSetMt_dop() &&
+        analyzer.getQueryOptions().mt_dop > 0) {
+      useMtScanNode_ = true;
+    } else {
+      useMtScanNode_ = false;
+    }
+
     computeStats(analyzer);
   }
 
@@ -293,6 +304,7 @@ public class KuduScanNode extends ScanNode {
   protected void toThrift(TPlanNode node) {
     node.node_type = TPlanNodeType.KUDU_SCAN_NODE;
     node.kudu_scan_node = new TKuduScanNode(desc_.getId().asInt());
+    node.kudu_scan_node.setUse_mt_scan_node(useMtScanNode_);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5bb988b1/testdata/workloads/functional-query/queries/QueryTest/mt-dop-kudu.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/mt-dop-kudu.test b/testdata/workloads/functional-query/queries/QueryTest/mt-dop-kudu.test
new file mode 100644
index 0000000..d7d8bf8
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/mt-dop-kudu.test
@@ -0,0 +1,39 @@
+====
+---- QUERY
+# Make sure LIMIT is enforced.
+select * from functional_kudu.dimtbl order by id limit 1;
+---- RESULTS
+1001,'Name1',94611
+---- TYPES
+BIGINT, STRING, INT
+====
+---- QUERY
+# Make sure that we can list the columns to be scanned in any order, that predicates
+# work and that we can have predicates on columns not referenced elsewhere.
+select zip, id from functional_kudu.dimtbl where id >= 1000 and 1002 >= id and
+94611 = zip and 'Name1' = name order by id;
+---- RESULTS
+94611,1001
+---- TYPES
+INT, BIGINT
+====
+---- QUERY
+SELECT a FROM functional_kudu.tinytable UNION ALL SELECT a FROM functional_kudu.tinytable;
+---- RESULTS
+'aaaaaaa'
+'ccccc'
+'eeeeeeee'
+'aaaaaaa'
+'ccccc'
+'eeeeeeee'
+---- TYPES
+STRING
+====
+---- QUERY
+# IMPALA-4408: Test Kudu scans where all materialized slots are non-nullable.
+select count(int_col) from functional_kudu.tinyinttable;
+---- RESULTS
+10
+---- TYPES
+BIGINT
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5bb988b1/tests/query_test/test_mt_dop.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_mt_dop.py b/tests/query_test/test_mt_dop.py
index 6ba8184..1277002 100644
--- a/tests/query_test/test_mt_dop.py
+++ b/tests/query_test/test_mt_dop.py
@@ -21,6 +21,7 @@ import pytest
 
 from copy import deepcopy
 from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.kudu_test_suite import KuduTestSuite
 from tests.common.skip import SkipIfOldAggsJoins
 from tests.common.test_vector import ImpalaTestDimension
 
@@ -103,3 +104,13 @@ class TestMtDopParquet(ImpalaTestSuite):
     vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop')
     self.run_test_case('QueryTest/parquet-filtering', vector)
 
+class TestMtDopKudu(KuduTestSuite):
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestMtDopKudu, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('mt_dop', *MT_DOP_VALUES))
+
+  def test_kudu(self, vector, unique_database):
+    vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop')
+    self.run_test_case('QueryTest/mt-dop-kudu', vector, use_db=unique_database)
+