You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ab...@apache.org on 2016/09/08 04:39:32 UTC

[2/7] incubator-impala git commit: IMPALA-3481: Use Kudu ScanToken API for scan ranges

IMPALA-3481: Use Kudu ScanToken API for scan ranges

Switches the planner and KuduScanNode to use Kudu's new
ScanToken API instead of explicitly constructing scan ranges
for all tablets of a table, regardless of whether they were
needed. The ScanToken API allows Impala to specify the
projected columns and predicates during planning, and Kudu
returns a set of 'scan tokens' that represent a scanner for
each tablet that needs to be scanned. The scan tokens can
be serialized and distributed to the scan nodes, which can
then deserialize them into Kudu scanner objects. Upon
deserialization, the scan token has all scan parameters
already, including the 'pushed down' predicates. Impala no
longer needs to send the Kudu predicates to the BE and
convert them at the scan node.

This change also fixes:
1) IMPALA-4016: Avoid materializing slots only referenced
                by Kudu conjuncts
2) IMPALA-3874: Predicates are not always pushed to Kudu

TODO: Consider additional planning improvements.

Testing: Updated the existing tests, verified everything
works as expected. Some BE tests no longer make sense and
they were removed.

TODO: When KUDU-1065 is resolved, add tests that demonstrate pruning.

Change-Id: I160e5849d372755748ff5ba3c90a4651c804b220
Reviewed-on: http://gerrit.cloudera.org:8080/4120
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/157c8005
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/157c8005
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/157c8005

Branch: refs/heads/master
Commit: 157c80056c62c89193a04d147d8c94fcb58610c4
Parents: 94ef5b1
Author: Matthew Jacobs <mj...@cloudera.com>
Authored: Fri Aug 19 08:49:25 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Thu Sep 8 01:50:51 2016 +0000

----------------------------------------------------------------------
 be/src/exec/kudu-scan-node-test.cc              | 358 +++--------------
 be/src/exec/kudu-scan-node.cc                   | 247 ++----------
 be/src/exec/kudu-scan-node.h                    | 108 ++---
 be/src/exec/kudu-scanner.cc                     | 101 ++---
 be/src/exec/kudu-scanner.h                      |  60 ++-
 be/src/exec/kudu-util.cc                        | 132 +------
 be/src/exec/kudu-util.h                         |  27 +-
 be/src/scheduling/simple-scheduler.cc           |   2 +-
 bin/impala-config.sh                            |   2 +-
 common/thrift/PlanNodes.thrift                  |  13 +-
 fe/pom.xml                                      |   2 +-
 .../java/com/cloudera/impala/analysis/Expr.java |   4 +-
 .../cloudera/impala/analysis/LiteralExpr.java   |   6 +-
 .../com/cloudera/impala/catalog/KuduTable.java  |   6 +-
 .../catalog/delegates/KuduDdlDelegate.java      |  16 +-
 .../cloudera/impala/planner/KuduScanNode.java   | 392 ++++++++++---------
 .../java/com/cloudera/impala/util/KuduUtil.java |  10 +-
 .../impala/planner/KuduPlannerTest.java         |  52 ---
 .../cloudera/impala/planner/PlannerTest.java    |  20 +
 .../impala/planner/PlannerTestBase.java         |  37 +-
 .../com/cloudera/impala/util/KuduUtilTest.java  |  10 +-
 .../queries/PlannerTest/kudu-selectivity.test   |   4 +-
 .../queries/PlannerTest/kudu-update.test        |  12 +-
 .../queries/PlannerTest/kudu.test               |  51 +--
 .../queries/PlannerTest/tpch-kudu.test          |  35 +-
 25 files changed, 522 insertions(+), 1185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/157c8005/be/src/exec/kudu-scan-node-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-test.cc b/be/src/exec/kudu-scan-node-test.cc
index 0cda0ec..972ae80 100644
--- a/be/src/exec/kudu-scan-node-test.cc
+++ b/be/src/exec/kudu-scan-node-test.cc
@@ -37,12 +37,6 @@
 #include "util/cpu-info.h"
 #include "util/test-info.h"
 
-DEFINE_bool(run_scan_bench, false, "Whether to run the Kudu scan micro benchmarks. Note:"
-    " These tests are very slow.");
-DEFINE_bool(bench_strings, true, "Whether to scan string columns in benchmarks.");
-DEFINE_int32(bench_num_rows, 10 * 1000 * 1000, "Num rows to insert in benchmarks.");
-DEFINE_int32(bench_num_splits, 40, "Num tablets to create in benchmarks");
-DEFINE_int32(bench_num_runs, 10, "Num times to run each benchmark.");
 DEFINE_bool(skip_delete_table, false, "Skips deleting the tables at the end of the tests.");
 DEFINE_string(use_existing_table, "", "The name of the existing table to use.");
 DECLARE_bool(pick_only_leaders_for_tests);
@@ -56,6 +50,7 @@ static const char* BASE_TABLE_NAME = "TestScanNodeTable";
 // The id of the slot that contains the key, in the test schema.
 const int KEY_SLOT_ID = 1;
 
+// TODO: Remove after the Kudu EE tests are providing sufficient coverage (IMPALA-3718).
 class KuduScanNodeTest : public testing::Test {
  public:
   virtual void SetUp() {
@@ -94,8 +89,6 @@ class KuduScanNodeTest : public testing::Test {
     runtime_state_->InitMemTrackers(TUniqueId(), NULL, -1);
 
     TKuduScanNode kudu_scan_node_;
-    kudu_scan_node_.__set_kudu_conjuncts(pushable_conjuncts_);
-
     kudu_node_.reset(new TPlanNode);
 
     kudu_node_->__set_kudu_scan_node(kudu_scan_node_);
@@ -176,9 +169,41 @@ class KuduScanNodeTest : public testing::Test {
     }
   };
 
-  void ScanAndVerify(const vector<TScanRangeParams>& params, int first_row,
-      int expected_num_rows, int num_cols_to_materialize,
+  void CreateScanRangeParams(int num_cols_to_materialize,
+      vector<TScanRangeParams>* params) {
+    DCHECK_GE(num_cols_to_materialize, 0);
+    DCHECK_LE(num_cols_to_materialize, 3);
+
+    kudu::client::KuduScanTokenBuilder b(kudu_test_helper_.table().get());
+    vector<string> col_names;
+    if (num_cols_to_materialize > 0) col_names.push_back("key");
+    if (num_cols_to_materialize > 1) col_names.push_back("int_val");
+    if (num_cols_to_materialize > 2) col_names.push_back("string_val");
+    KUDU_ASSERT_OK(b.SetProjectedColumnNames(col_names));
+
+    vector<kudu::client::KuduScanToken*> tokens;
+    KUDU_ASSERT_OK(b.Build(&tokens));
+
+    for (kudu::client::KuduScanToken* token: tokens) {
+      TScanRange range;
+      string buf;
+      KUDU_ASSERT_OK(token->Serialize(&buf));
+      range.__set_kudu_scan_token(buf);
+
+      TScanRangeParams scan_range_params;
+      scan_range_params.__set_is_cached(false);
+      scan_range_params.__set_is_remote(false);
+      scan_range_params.__set_volume_id(0);
+      scan_range_params.__set_scan_range(range);
+      params->push_back(scan_range_params);
+    }
+  }
+
+  void ScanAndVerify(int first_row, int expected_num_rows, int num_cols_to_materialize,
       int num_notnull_cols = 3, int limit = NO_LIMIT, bool verify = true) {
+    vector<TScanRangeParams> params;
+    CreateScanRangeParams(num_cols_to_materialize, &params);
+
     BuildRuntimeStateForScans(num_cols_to_materialize);
     if (limit != NO_LIMIT) kudu_node_->__set_limit(limit);
 
@@ -220,49 +245,6 @@ class KuduScanNodeTest : public testing::Test {
     scanner.Close(runtime_state_.get());
   }
 
-  // Adds a new TScanRangeParams to 'params'.
-  // If start_key and/or stop_key are '-1' an empty key string is passed
-  // instead.
-  void AddScanRange(int start_key, int stop_key,
-      vector<TScanRangeParams>* params) {
-
-    string encoded_start_key;
-    if (start_key != -1) {
-      scoped_ptr<KuduPartialRow> start_key_row(kudu_test_helper_.test_schema().NewRow());
-      KUDU_ASSERT_OK(start_key_row->SetInt32(0, start_key));
-      start_key_row->EncodeRowKey(&encoded_start_key);
-    } else {
-      encoded_start_key = "";
-    }
-
-    string encoded_stop_key;
-    if (stop_key != -1) {
-      gscoped_ptr<KuduPartialRow> stop_key_row(kudu_test_helper_.test_schema().NewRow());
-      KUDU_ASSERT_OK(stop_key_row->SetInt32(0, stop_key));
-      stop_key_row->EncodeRowKey(&encoded_stop_key);
-    } else {
-      encoded_stop_key = "";
-    }
-
-    TKuduKeyRange kudu_key_range;
-    kudu_key_range.__set_range_start_key(encoded_start_key);
-    kudu_key_range.__set_range_stop_key(encoded_stop_key);
-
-    TScanRange scan_range;
-    scan_range.__set_kudu_key_range(kudu_key_range);
-
-    TScanRangeParams scan_range_params;
-    scan_range_params.__set_is_cached(false);
-    scan_range_params.__set_is_remote(false);
-    scan_range_params.__set_volume_id(0);
-    scan_range_params.__set_scan_range(scan_range);
-
-    params->push_back(scan_range_params);
-  }
-
-  void DoBenchmarkScan(const string& name, const vector<TScanRangeParams>& params,
-                       int num_cols);
-
  protected:
   KuduTestHelper kudu_test_helper_;
   boost::scoped_ptr<MemTracker> mem_tracker_;
@@ -272,7 +254,6 @@ class KuduScanNodeTest : public testing::Test {
   boost::scoped_ptr<TPlanNode> kudu_node_;
   TTableDescriptor t_tbl_desc_;
   DescriptorTbl* desc_tbl_;
-  vector<TExpr> pushable_conjuncts_;
 };
 
 TEST_F(KuduScanNodeTest, TestScanNode) {
@@ -286,17 +267,10 @@ TEST_F(KuduScanNodeTest, TestScanNode) {
                                    NUM_ROWS);
 
 
-  // Test having multiple scan ranges, the range is split by tablet boundaries. Default
-  // split is at '5'.
-  int mid_key = 5;
-  vector<TScanRangeParams> params;
-  AddScanRange(FIRST_ROW, mid_key, &params);
-  AddScanRange(mid_key, NUM_ROWS, &params);
-
   // Test materializing all, some, or none of the slots.
-  ScanAndVerify(params, FIRST_ROW, NUM_ROWS, 3);
-  ScanAndVerify(params, FIRST_ROW, NUM_ROWS, 2);
-  ScanAndVerify(params, FIRST_ROW, NUM_ROWS, 0, /* 0 non-null cols */0, NO_LIMIT,
+  ScanAndVerify(FIRST_ROW, NUM_ROWS, 3);
+  ScanAndVerify(FIRST_ROW, NUM_ROWS, 2);
+  ScanAndVerify(FIRST_ROW, NUM_ROWS, 0, /* 0 non-null cols */0, NO_LIMIT,
       /* Don't verify */false);
 }
 
@@ -311,164 +285,10 @@ TEST_F(KuduScanNodeTest, TestScanNullColValues) {
                                    kudu_test_helper_.table().get(),
                                    NUM_ROWS, 0, 1);
 
-  vector<TScanRangeParams> params;
-  AddScanRange(-1, -1, &params);
-
   // Try scanning including and not including the null columns.
-  ScanAndVerify(params, FIRST_ROW, NUM_ROWS, 3, 1);
-  ScanAndVerify(params, FIRST_ROW, NUM_ROWS, 2, 1);
-  ScanAndVerify(params, FIRST_ROW, NUM_ROWS, 1, 1);
-}
-
-namespace {
-
-// Sets a binary predicate, based on 'type', to the plan node. This will be transformed
-// into a range predicate by KuduScanNode.
-void AddExpressionNodesToExpression(TExpr* expression, int slot_id, const string& op_name,
-    TExprNodeType::type constant_type, const void* value) {
-
-  vector<TExprNode> nodes;
-
-  // Build the op node.
-  TExprNode function_node;
-  function_node.__set_node_type(TExprNodeType::FUNCTION_CALL);
-  TFunctionName function_name;
-  function_name.__set_function_name(op_name);
-  TFunction function;
-  function.__set_name(function_name);
-  function_node.__set_fn(function);
-
-  nodes.push_back(function_node);
-
-  // Add the slof ref.
-  TExprNode slot_ref_node;
-  slot_ref_node.__set_node_type(TExprNodeType::SLOT_REF);
-  TSlotRef slot_ref;
-  slot_ref.__set_slot_id(slot_id);
-  slot_ref_node.__set_slot_ref(slot_ref);
-
-  nodes.push_back(slot_ref_node);
-
-  TExprNode constant_node;
-  constant_node.__set_node_type(constant_type);
-  // Add the constant part.
-  switch(constant_type) {
-    // We only add the boilerplate for the types used in the test schema.
-    case TExprNodeType::INT_LITERAL: {
-      TIntLiteral int_literal;
-      int_literal.__set_value(*reinterpret_cast<const int64_t*>(value));
-      constant_node.__set_int_literal(int_literal);
-      break;
-    }
-    case TExprNodeType::STRING_LITERAL: {
-      TStringLiteral string_literal;
-      string_literal.__set_value(*reinterpret_cast<const string*>(value));
-      constant_node.__set_string_literal(string_literal);
-      break;
-    }
-    default:
-      LOG(FATAL) << "Unsupported function type";
-  }
-
-  nodes.push_back(constant_node);
-  expression->__set_nodes(nodes);
-}
-
-} // anonymous namespace
-
-
-// Test a >= predicate on the Key.
-TEST_F(KuduScanNodeTest, TestPushIntGEPredicateOnKey) {
-  kudu_test_helper_.CreateTable(BASE_TABLE_NAME);
-  const int NUM_ROWS_TO_INSERT = 1000;
-  const int SLOT_ID = 1;
-  const int MAT_COLS = 3;
-  const int FIRST_ROW = 500;
-  const int EXPECTED_NUM_ROWS = 500;
-
-  // Insert kNumRows rows for this test.
-  kudu_test_helper_.InsertTestRows(kudu_test_helper_.client().get(),
-                                   kudu_test_helper_.table().get(),
-                                   NUM_ROWS_TO_INSERT);
-
-  const int64_t PREDICATE_VALUE = 500;
-
-  // Test having a pushable predicate on the key (key >= PREDICATE_VALUE).
-  TExpr conjunct;
-  AddExpressionNodesToExpression(&conjunct, SLOT_ID, KuduScanNode::GE_FN,
-      TExprNodeType::INT_LITERAL, &PREDICATE_VALUE);
-  pushable_conjuncts_.push_back(conjunct);
-
-  vector<TScanRangeParams> params;
-  AddScanRange(-1, -1, &params);
-
-  ScanAndVerify(params, FIRST_ROW, EXPECTED_NUM_ROWS, MAT_COLS);
-}
-
-// Test a == predicate on the 2nd column.
-TEST_F(KuduScanNodeTest, TestPushIntEQPredicateOn2ndColumn) {
-  kudu_test_helper_.CreateTable(BASE_TABLE_NAME);
-  const int NUM_ROWS_TO_INSERT = 1000;
-  const int SLOT_ID = 2;
-  const int MAT_COLS = 3;
-
-  const int FIRST_ROW = 500;
-  const int EXPECTED_NUM_ROWS = 1;
-
-  // Insert kNumRows rows for this test.
-  kudu_test_helper_.InsertTestRows(kudu_test_helper_.client().get(),
-                                   kudu_test_helper_.table().get(),
-                                   NUM_ROWS_TO_INSERT);
-
-  // When rows are added col2 is 2 * col1 so we multiply by two to get back
-  // first row = 500.
-  const int64_t PREDICATE_VAL = FIRST_ROW * 2;
-
-  // Now test having a pushable predicate on the 2nd column.
-  TExpr conjunct;
-  AddExpressionNodesToExpression(&conjunct, SLOT_ID, KuduScanNode::EQ_FN,
-      TExprNodeType::INT_LITERAL, &PREDICATE_VAL);
-  pushable_conjuncts_.push_back(conjunct);
-
-  vector<TScanRangeParams> params;
-  AddScanRange(-1, -1, &params);
-
-  ScanAndVerify(params, FIRST_ROW, EXPECTED_NUM_ROWS, MAT_COLS);
-}
-
-TEST_F(KuduScanNodeTest, TestPushStringLEPredicateOn3rdColumn) {
-  kudu_test_helper_.CreateTable(BASE_TABLE_NAME);
-  const int NUM_ROWS_TO_INSERT = 1000;
-  const int SLOT_ID = 3;
-  const int MAT_COLS = 3;
-  // This predicate won't return consecutive rows so we just assert on the count.
-  const bool VERIFY_ROWS = false;
-
-  const int FIRST_ROW = 0;
-  const string PREDICATE_VAL = "hello_500";
-
-  // We expect 448 rows as Kudu will lexicographically compare the predicate value to
-  // column values. We can expect to obtain rows 1-500, inclusive, except numbers
-  // 6-9 (hello_6 > hello_500) and numbers 51-99 for a total of 52 excluded numbers.
-  const int EXPECTED_NUM_ROWS = 448;
-
-
-  // Insert kNumRows rows for this test.
-  kudu_test_helper_.InsertTestRows(kudu_test_helper_.client().get(),
-                                   kudu_test_helper_.table().get(),
-                                   NUM_ROWS_TO_INSERT);
-
-  // Now test having a pushable predicate on the third column.
-  TExpr conjunct;
-  AddExpressionNodesToExpression(&conjunct, SLOT_ID, KuduScanNode::LE_FN,
-      TExprNodeType::STRING_LITERAL, &PREDICATE_VAL);
-  pushable_conjuncts_.push_back(conjunct);
-
-  vector<TScanRangeParams> params;
-  AddScanRange(-1, -1, &params);
-
-  ScanAndVerify(params, FIRST_ROW, EXPECTED_NUM_ROWS, MAT_COLS, MAT_COLS,
-                NO_LIMIT, VERIFY_ROWS);
+  ScanAndVerify(FIRST_ROW, NUM_ROWS, 3, 1);
+  ScanAndVerify(FIRST_ROW, NUM_ROWS, 2, 1);
+  ScanAndVerify(FIRST_ROW, NUM_ROWS, 1, 1);
 }
 
 // Test for a bug where we would mishandle getting an empty string from
@@ -485,15 +305,21 @@ TEST_F(KuduScanNodeTest, TestScanEmptyString) {
   KUDU_ASSERT_OK(session->Flush());
   ASSERT_FALSE(session->HasPendingOperations());
 
-  BuildRuntimeStateForScans(3);
+  int num_cols_to_materialize = 3;
+  BuildRuntimeStateForScans(num_cols_to_materialize);
   KuduScanNode scanner(obj_pool_.get(), *kudu_node_, *desc_tbl_);
+
   vector<TScanRangeParams> params;
-  AddScanRange(-1, -1, &params);
+  CreateScanRangeParams(num_cols_to_materialize, &params);
   SetUpScanner(&scanner, params);
   bool eos = false;
   RowBatch* batch = obj_pool_->Add(
       new RowBatch(scanner.row_desc(), DEFAULT_ROWS_PER_BATCH, mem_tracker_.get()));
-  ASSERT_OK(scanner.GetNext(runtime_state_.get(), batch, &eos));
+  for (int i = 0; i < 2 && batch->num_rows() == 0; ++i) {
+    // Allow for up to 2 empty row batches since there are scanners created for all
+    // tablets (1 split point), and only one row was inserted.
+    ASSERT_OK(scanner.GetNext(runtime_state_.get(), batch, &eos));
+  }
   ASSERT_EQ(1, batch->num_rows());
 
   ASSERT_OK(scanner.GetNext(runtime_state_.get(), NULL, &eos));
@@ -513,91 +339,13 @@ TEST_F(KuduScanNodeTest, TestLimitsAreEnforced) {
                                    kudu_test_helper_.table().get(),
                                    NUM_ROWS);
 
-  vector<TScanRangeParams> params;
-  AddScanRange(-1, -1, &params);
-
   // Try scanning but limit the number of returned rows to several different values.
   int limit_rows_to = 0;
-  ScanAndVerify(params, FIRST_ROW, limit_rows_to, 3, 3, limit_rows_to);
+  ScanAndVerify(FIRST_ROW, limit_rows_to, 3, 3, limit_rows_to);
   limit_rows_to = 1;
-  ScanAndVerify(params, FIRST_ROW, limit_rows_to, 3, 3, limit_rows_to);
+  ScanAndVerify(FIRST_ROW, limit_rows_to, 3, 3, limit_rows_to);
   limit_rows_to = 2000;
-  ScanAndVerify(params, FIRST_ROW, 1000, 3, 3, limit_rows_to);
-}
-
-void KuduScanNodeTest::DoBenchmarkScan(const string& name,
-    const vector<TScanRangeParams>& params, int num_cols) {
-
-  const int NUM_ROWS = FLAGS_bench_num_rows;
-  const int FIRST_ROW = 0;
-
-  double avg = 0;
-  int num_runs = 0;
-
-  LOG(INFO) << "===== Starting benchmark: " << name;
-  for (int i = 0; i < FLAGS_bench_num_runs; ++i) {
-    MonotonicStopWatch watch;
-    watch.Start();
-    ScanAndVerify(params, FIRST_ROW, NUM_ROWS, num_cols, 3, NO_LIMIT, false);
-    watch.Stop();
-    int64_t total = watch.ElapsedTime();
-    avg += total;
-    ++num_runs;
-    LOG(INFO) << "Run took: " << (total / (1000 * 1000)) << " msecs.";
-  }
-
-  avg = (avg / num_runs) / (1000 * 1000);
-  LOG(INFO) << "===== Benchmark: " << name << " took(avg): " << avg << " msecs.";
-}
-
-TEST_F(KuduScanNodeTest, BenchmarkScanNode) {
-  if (!FLAGS_run_scan_bench) return;
-
-  // Generate some more splits and insert a lot more rows.
-  const int NUM_ROWS = FLAGS_bench_num_rows;
-  const int NUM_SPLITS = FLAGS_bench_num_splits;
-  const int FIRST_ROW = 0;
-
-  if (FLAGS_use_existing_table == "") {
-    vector<const KuduPartialRow*> split_rows;
-    for (int i = 1; i < NUM_SPLITS; ++i) {
-       int split_key = (NUM_ROWS / NUM_SPLITS) * i;
-       KuduPartialRow* row = kudu_test_helper_.test_schema().NewRow();
-       KUDU_ASSERT_OK(row->SetInt32(0, split_key));
-       split_rows.push_back(row);
-    }
-    kudu_test_helper_.CreateTable(BASE_TABLE_NAME, &split_rows);
-    // Insert NUM_ROWS rows for this test.
-    kudu_test_helper_.InsertTestRows(kudu_test_helper_.client().get(),
-                                     kudu_test_helper_.table().get(),
-                                     NUM_ROWS);
-    LOG(INFO) << "Inserted: " << NUM_ROWS << " rows into " << NUM_SPLITS << " tablets.";
-  } else {
-    kudu_test_helper_.OpenTable(FLAGS_use_existing_table);
-  }
-
-  // TODO We calculate the scan ranges based on the test params and not by actually
-  // querying the tablet servers since Kudu doesn't have an API to get them.
-  vector<TScanRangeParams> params;
-  int previous_split_key = -1;
-  for (int i = 1; i < NUM_SPLITS; ++i) {
-     int split_key = (NUM_ROWS / NUM_SPLITS) * i;
-     AddScanRange(previous_split_key, split_key, &params);
-     previous_split_key = split_key;
-     if (i == NUM_SPLITS -1) {
-       AddScanRange(split_key, -1, &params);
-     }
-  }
-
-  LOG(INFO) << "Warming up scans.";
-
-  // Scan all columns once to warmup.
-  ScanAndVerify(params, FIRST_ROW, NUM_ROWS, 3, 3, NO_LIMIT, false);
-
-  DoBenchmarkScan("No cols", params, 0);
-  DoBenchmarkScan("Key only", params, 1);
-  DoBenchmarkScan("Int cols", params, 2);
-  DoBenchmarkScan("All cols with strings", params, 3);
+  ScanAndVerify(FIRST_ROW, 1000, 3, 3, limit_rows_to);
 }
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/157c8005/be/src/exec/kudu-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index a7a068c..b247c67 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -65,25 +65,29 @@ namespace impala {
 const string KuduScanNode::KUDU_READ_TIMER = "TotalKuduReadTime";
 const string KuduScanNode::KUDU_ROUND_TRIPS = "TotalKuduScanRoundTrips";
 
-const std::string KuduScanNode::GE_FN = "ge";
-const std::string KuduScanNode::LE_FN = "le";
-const std::string KuduScanNode::EQ_FN = "eq";
-
 KuduScanNode::KuduScanNode(ObjectPool* pool, const TPlanNode& tnode,
     const DescriptorTbl& descs)
     : ScanNode(pool, tnode, descs),
       tuple_id_(tnode.kudu_scan_node.tuple_id),
-      next_scan_range_idx_(0),
+      next_scan_token_idx_(0),
       num_active_scanners_(0),
       done_(false),
-      pushable_conjuncts_(tnode.kudu_scan_node.kudu_conjuncts),
       thread_avail_cb_id_(-1) {
   DCHECK(KuduIsAvailable());
+
+  int max_row_batches = FLAGS_kudu_max_row_batches;
+  if (max_row_batches <= 0) {
+    // TODO: See comment on hdfs-scan-node.
+    // This value is built the same way as it assumes that the scan node runs co-located
+    // with a Kudu tablet server and that the tablet server is using disks similarly as
+    // a datanode would.
+    max_row_batches = 10 * (DiskInfo::num_disks() + DiskIoMgr::REMOTE_NUM_DISKS);
+  }
+  materialized_row_batches_.reset(new RowBatchQueue(max_row_batches));
 }
 
 KuduScanNode::~KuduScanNode() {
   DCHECK(is_closed());
-  STLDeleteElements(&kudu_predicates_);
 }
 
 Status KuduScanNode::Prepare(RuntimeState* state) {
@@ -100,24 +104,11 @@ Status KuduScanNode::Prepare(RuntimeState* state) {
 
   tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_);
 
-  // Convert TScanRangeParams to ScanRanges.
-  CHECK(scan_range_params_ != NULL)
-      << "Must call SetScanRanges() before calling Prepare()";
+  // Initialize the list of scan tokens to process from the TScanRangeParams.
+  DCHECK(scan_range_params_ != NULL);
   for (const TScanRangeParams& params: *scan_range_params_) {
-    const TKuduKeyRange& key_range = params.scan_range.kudu_key_range;
-    key_ranges_.push_back(key_range);
+    scan_tokens_.push_back(params.scan_range.kudu_scan_token);
   }
-  max_materialized_row_batches_ = FLAGS_kudu_max_row_batches;
-
-  if (max_materialized_row_batches_ <= 0) {
-    // TODO: See comment on hdfs-scan-node.
-    // This value is built the same way as it assumes that the scan node runs co-located
-    // with a Kudu tablet server and that the tablet server is using disks similarly as
-    // a datanode would.
-    max_materialized_row_batches_ =
-        10 * (DiskInfo::num_disks() + DiskIoMgr::REMOTE_NUM_DISKS);
-  }
-  materialized_row_batches_.reset(new RowBatchQueue(max_materialized_row_batches_));
   return Status::OK();
 }
 
@@ -139,15 +130,11 @@ Status KuduScanNode::Open(RuntimeState* state) {
 
   KUDU_RETURN_IF_ERROR(client_->OpenTable(table_desc->table_name(), &table_),
       "Unable to open Kudu table");
-  RETURN_IF_ERROR(ProjectedColumnsFromTupleDescriptor(*tuple_desc_, &projected_columns_,
-      table_->schema()));
-  // Must happen after table_ is opened.
-  RETURN_IF_ERROR(TransformPushableConjunctsToRangePredicates());
 
   num_scanner_threads_started_counter_ =
       ADD_COUNTER(runtime_profile(), NUM_SCANNER_THREADS_STARTED, TUnit::UNIT);
 
-  // Reserve one thread token.
+  // Reserve one thread.
   state->resource_pool()->ReserveOptionalTokens(1);
   if (state->query_options().num_scanner_threads > 0) {
     state->resource_pool()->set_max_quota(
@@ -155,8 +142,8 @@ Status KuduScanNode::Open(RuntimeState* state) {
   }
 
   thread_avail_cb_id_ = state->resource_pool()->AddThreadAvailableCb(
-      bind<void>(mem_fn(&KuduScanNode::ThreadTokenAvailableCb), this, _1));
-  ThreadTokenAvailableCb(state->resource_pool());
+      bind<void>(mem_fn(&KuduScanNode::ThreadAvailableCb), this, _1));
+  ThreadAvailableCb(state->resource_pool());
   return Status::OK();
 }
 
@@ -167,7 +154,7 @@ Status KuduScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   SCOPED_TIMER(materialize_tuple_timer());
 
-  if (ReachedLimit() || key_ranges_.empty()) {
+  if (ReachedLimit() || scan_tokens_.empty()) {
     *eos = true;
     return Status::OK();
   }
@@ -203,145 +190,6 @@ Status KuduScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos
   return status;
 }
 
-Status KuduScanNode::TransformPushableConjunctsToRangePredicates() {
-  // The only supported pushable predicates are binary operators with a SlotRef and a
-  // literal as the left and right operands respectively.
-  for (const TExpr& predicate: pushable_conjuncts_) {
-    DCHECK_EQ(predicate.nodes.size(), 3);
-    const TExprNode& function_call = predicate.nodes[0];
-
-    DCHECK_EQ(function_call.node_type, TExprNodeType::FUNCTION_CALL);
-
-    IdxByLowerCaseColName idx_by_lc_name;
-    RETURN_IF_ERROR(MapLowercaseKuduColumnNamesToIndexes(table_->schema(),
-        &idx_by_lc_name));
-
-    string impala_col_name;
-    GetSlotRefColumnName(predicate.nodes[1], &impala_col_name);
-
-    IdxByLowerCaseColName::const_iterator iter;
-    if ((iter = idx_by_lc_name.find(impala_col_name)) ==
-        idx_by_lc_name.end()) {
-      return Status(Substitute("Could not find col: '$0' in the table schema",
-                               impala_col_name));
-    }
-
-    KuduColumnSchema column = table_->schema().Column(iter->second);
-
-    KuduValue* bound;
-    RETURN_IF_ERROR(GetExprLiteralBound(predicate.nodes[2], column.type(), &bound));
-    DCHECK(bound != NULL);
-
-    const string& function_name = function_call.fn.name.function_name;
-    if (function_name == GE_FN) {
-      kudu_predicates_.push_back(table_->NewComparisonPredicate(column.name(),
-          KuduPredicate::GREATER_EQUAL, bound));
-    } else if (function_name == LE_FN) {
-      kudu_predicates_.push_back(table_->NewComparisonPredicate(column.name(),
-          KuduPredicate::LESS_EQUAL, bound));
-    } else if (function_name == EQ_FN) {
-      kudu_predicates_.push_back(table_->NewComparisonPredicate(column.name(),
-          KuduPredicate::EQUAL, bound));
-    } else {
-      DCHECK(false) << "Received unpushable operator to push down: " << function_name;
-    }
-  }
-  return Status::OK();
-}
-
-void KuduScanNode::GetSlotRefColumnName(const TExprNode& node, string* col_name) {
-  const KuduTableDescriptor* table_desc =
-      static_cast<const KuduTableDescriptor*>(tuple_desc_->table_desc());
-  TSlotId slot_id = node.slot_ref.slot_id;
-  for (SlotDescriptor* slot: tuple_desc_->slots()) {
-    if (slot->id() == slot_id) {
-      int col_idx = slot->col_pos();
-      *col_name = table_desc->col_descs()[col_idx].name();
-      return;
-    }
-  }
-
-  DCHECK(false) << "Could not find a slot with slot id: " << slot_id;
-}
-
-namespace {
-
-typedef std::map<int, const char*> TypeNamesMap;
-
-// Gets the name of an Expr node type.
-string NodeTypeToString(TExprNodeType::type type) {
-  const TypeNamesMap& type_names_map =
-      impala::_TExprNodeType_VALUES_TO_NAMES;
-  TypeNamesMap::const_iterator iter = type_names_map.find(type);
-
-  if (iter == type_names_map.end()) {
-    return Substitute("Unknown type: $0", type);
-  }
-
-  return (*iter).second;
-}
-
-} // anonymous namespace
-
-Status KuduScanNode::GetExprLiteralBound(const TExprNode& node,
-    KuduColumnSchema::DataType type, KuduValue** value) {
-
-  // Build the Kudu values based on the type of the Kudu column.
-  // We're restrictive regarding which types we accept as the planner does all casting
-  // in the frontend and predicates only get pushed down if the types match.
-  switch (type) {
-    // For types BOOL and STRING we expect the expression literal to match perfectly.
-    case kudu::client::KuduColumnSchema::BOOL: {
-      if (node.node_type != TExprNodeType::BOOL_LITERAL) {
-        return Status(Substitute("Cannot create predicate over column of type BOOL with "
-            "value of type: $0", NodeTypeToString(node.node_type)));
-      }
-      *value = KuduValue::FromBool(node.bool_literal.value);
-      return Status::OK();
-    }
-    case kudu::client::KuduColumnSchema::STRING: {
-      if (node.node_type != TExprNodeType::STRING_LITERAL) {
-        return Status(Substitute("Cannot create predicate over column of type STRING"
-            " with value of type: $0", NodeTypeToString(node.node_type)));
-      }
-      *value = KuduValue::CopyString(node.string_literal.value);
-      return Status::OK();
-    }
-    case kudu::client::KuduColumnSchema::INT8:
-    case kudu::client::KuduColumnSchema::INT16:
-    case kudu::client::KuduColumnSchema::INT32:
-    case kudu::client::KuduColumnSchema::INT64: {
-      if (node.node_type != TExprNodeType::INT_LITERAL) {
-        return Status(Substitute("Cannot create predicate over column of type INT with "
-            "value of type: $0", NodeTypeToString(node.node_type)));
-      }
-      *value = KuduValue::FromInt(node.int_literal.value);
-      return Status::OK();
-    }
-    case kudu::client::KuduColumnSchema::FLOAT: {
-      if (node.node_type != TExprNodeType::FLOAT_LITERAL) {
-        return Status(Substitute("Cannot create predicate over column of type FLOAT with "
-            "value of type: $0", NodeTypeToString(node.node_type)));
-      }
-      *value = KuduValue::FromFloat(node.float_literal.value);
-      return Status::OK();
-    }
-    case kudu::client::KuduColumnSchema::DOUBLE: {
-      if (node.node_type != TExprNodeType::FLOAT_LITERAL) {
-        return Status(Substitute("Cannot create predicate over column of type DOUBLE with "
-            "value of type: $0", NodeTypeToString(node.node_type)));
-      }
-      *value = KuduValue::FromDouble(node.float_literal.value);
-      return Status::OK();
-    }
-    default:
-      // Should be unreachable.
-      LOG(DFATAL) << "Unsupported node type: " << node.node_type;
-  }
-  // Avoid compiler warning.
-  return Status("Unreachable");
-}
-
 void KuduScanNode::Close(RuntimeState* state) {
   if (is_closed()) return;
   SCOPED_TIMER(runtime_profile_->total_time_counter());
@@ -368,30 +216,22 @@ void KuduScanNode::DebugString(int indentation_level, stringstream* out) const {
   *out << indent << "KuduScanNode(tupleid=" << tuple_id_ << ")";
 }
 
-TKuduKeyRange* KuduScanNode::GetNextKeyRange() {
+const string* KuduScanNode::GetNextScanToken() {
   unique_lock<mutex> lock(lock_);
-  if (next_scan_range_idx_ >= key_ranges_.size()) return NULL;
-  TKuduKeyRange* range = &key_ranges_[next_scan_range_idx_++];
-  return range;
+  if (next_scan_token_idx_ >= scan_tokens_.size()) return NULL;
+  const string* token = &scan_tokens_[next_scan_token_idx_++];
+  return token;
 }
 
 Status KuduScanNode::GetConjunctCtxs(vector<ExprContext*>* ctxs) {
   return Expr::CloneIfNotExists(conjunct_ctxs_, runtime_state_, ctxs);
 }
 
-void KuduScanNode::ClonePredicates(vector<KuduPredicate*>* predicates) {
-  unique_lock<mutex> l(lock_);
-  for (KuduPredicate* predicate: kudu_predicates_) {
-    predicates->push_back(predicate->Clone());
-  }
-}
-
-void KuduScanNode::ThreadTokenAvailableCb(
-    ThreadResourceMgr::ResourcePool* pool) {
+void KuduScanNode::ThreadAvailableCb(ThreadResourceMgr::ResourcePool* pool) {
   while (true) {
     unique_lock<mutex> lock(lock_);
-    // All done or all ranges are assigned.
-    if (done_ || next_scan_range_idx_ >= key_ranges_.size()) break;
+    // All done or all tokens are assigned.
+    if (done_ || next_scan_token_idx_ >= scan_tokens_.size()) break;
 
     // Check if we can get a token.
     if (!pool->TryAcquireThreadToken()) break;
@@ -399,14 +239,14 @@ void KuduScanNode::ThreadTokenAvailableCb(
     ++num_active_scanners_;
     COUNTER_ADD(num_scanner_threads_started_counter_, 1);
 
-    // Reserve the first range so no other thread picks it up.
-    TKuduKeyRange* range = &key_ranges_[next_scan_range_idx_++];
+    // Reserve the first token so no other thread picks it up.
+    const string* token = &scan_tokens_[next_scan_token_idx_++];
     string name = Substitute("scanner-thread($0)",
         num_scanner_threads_started_counter_->value());
 
-    VLOG(1) << "Thread started: " << name;
+    VLOG_RPC << "Thread started: " << name;
     scanner_threads_.AddThread(new Thread("kudu-scan-node", name,
-        &KuduScanNode::ScannerThread, this, name, range));
+        &KuduScanNode::RunScannerThread, this, name, token));
 
     if (runtime_state_->query_resource_mgr() != NULL) {
       runtime_state_->query_resource_mgr()->NotifyThreadUsageChange(1);
@@ -414,12 +254,12 @@ void KuduScanNode::ThreadTokenAvailableCb(
   }
 }
 
-Status KuduScanNode::ProcessRange(KuduScanner* scanner, const TKuduKeyRange* key_range) {
-  RETURN_IF_ERROR(scanner->OpenNextRange(*key_range));
+Status KuduScanNode::ProcessScanToken(KuduScanner* scanner, const string& scan_token) {
+  RETURN_IF_ERROR(scanner->OpenNextScanToken(scan_token));
   bool eos = false;
   while (!eos) {
     gscoped_ptr<RowBatch> row_batch(new RowBatch(
-          row_desc(), runtime_state_->batch_size(), mem_tracker()));
+        row_desc(), runtime_state_->batch_size(), mem_tracker()));
     RETURN_IF_ERROR(scanner->GetNext(row_batch.get(), &eos));
     while (!done_) {
       scanner->KeepKuduScannerAlive();
@@ -429,13 +269,12 @@ Status KuduScanNode::ProcessRange(KuduScanner* scanner, const TKuduKeyRange* key
       }
     }
   }
-  // Mark the current scan range as complete.
   if (eos) scan_ranges_complete_counter()->Add(1);
   return Status::OK();
 }
 
-void KuduScanNode::ScannerThread(const string& name, const TKuduKeyRange* initial_range) {
-  DCHECK(initial_range != NULL);
+void KuduScanNode::RunScannerThread(const string& name, const string* initial_token) {
+  DCHECK(initial_token != NULL);
   SCOPED_THREAD_COUNTER_MEASUREMENT(scanner_thread_counters());
   SCOPED_TIMER(runtime_state_->total_cpu_timer());
 
@@ -443,29 +282,29 @@ void KuduScanNode::ScannerThread(const string& name, const TKuduKeyRange* initia
   // exceeded and is exiting early.
   bool optional_thread_exiting = false;
   KuduScanner scanner(this, runtime_state_);
-  Status status = scanner.Open();
 
+  const string* scan_token = initial_token;
+  Status status = scanner.Open();
   if (status.ok()) {
-    const TKuduKeyRange* key_range = initial_range;
-    while (!done_ && key_range != NULL) {
-      status = ProcessRange(&scanner, key_range);
+    while (!done_ && scan_token != NULL) {
+      status = ProcessScanToken(&scanner, *scan_token);
       if (!status.ok()) break;
 
       // Check if the number of optional threads has been exceeded.
       if (runtime_state_->resource_pool()->optional_exceeded()) {
         unique_lock<mutex> l(lock_);
         // Don't exit if this is the last thread. Otherwise, the scan will indicate it's
-        // done before all ranges have been processed.
+        // done before all scan tokens have been processed.
         if (num_active_scanners_ > 1) {
           --num_active_scanners_;
           optional_thread_exiting = true;
           break;
         }
       }
-      key_range = GetNextKeyRange();
+      scan_token = GetNextScanToken();
     }
-    scanner.Close();
   }
+  scanner.Close();
 
   {
     unique_lock<mutex> l(lock_);
@@ -485,8 +324,8 @@ void KuduScanNode::ScannerThread(const string& name, const TKuduKeyRange* initia
   }
 
   // lock_ is released before calling ThreadResourceMgr::ReleaseThreadToken() which
-  // invokes ThreadTokenAvailableCb() which attempts to take the same lock.
-  VLOG(1) << "Thread done: " << name;
+  // invokes ThreadAvailableCb() which attempts to take the same lock.
+  VLOG_RPC << "Thread done: " << name;
   runtime_state_->resource_pool()->ReleaseThreadToken(false);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/157c8005/be/src/exec/kudu-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.h b/be/src/exec/kudu-scan-node.h
index d243cb8..b1848d7 100644
--- a/be/src/exec/kudu-scan-node.h
+++ b/be/src/exec/kudu-scan-node.h
@@ -31,50 +31,30 @@
 namespace impala {
 
 class KuduScanner;
-class Tuple;
 
-/// A scan node that scans Kudu TabletServers.
+/// A scan node that scans a Kudu table.
 ///
-/// This scan node takes a set of ranges and uses a Kudu client to retrieve the data
-/// belonging to those ranges from Kudu. The client's schema is rebuilt from the
-/// TupleDescriptors forwarded by the frontend so that we're sure all the scan nodes
-/// use the same schema, for the same scan.
+/// This takes a set of serialized Kudu scan tokens which encode the information needed
+/// for this scan. A Kudu client deserializes the tokens into kudu scanners, and those
+/// are used to retrieve the rows for this scan.
 class KuduScanNode : public ScanNode {
  public:
   KuduScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
 
   ~KuduScanNode();
 
-  /// Create Kudu schema and columns to slots mapping.
   virtual Status Prepare(RuntimeState* state);
-
-  /// Start Kudu scan.
   virtual Status Open(RuntimeState* state);
-
-  /// Fill the next row batch by fetching more data from the KuduScanner.
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
-
-  /// Close connections to Kudu.
   virtual void Close(RuntimeState* state);
 
  protected:
-  /// Write debug string of this into out.
   virtual void DebugString(int indentation_level, std::stringstream* out) const;
 
  private:
-  FRIEND_TEST(KuduScanNodeTest, TestPushIntGEPredicateOnKey);
-  FRIEND_TEST(KuduScanNodeTest, TestPushIntEQPredicateOn2ndColumn);
-  FRIEND_TEST(KuduScanNodeTest, TestPushStringLEPredicateOn3rdColumn);
-  FRIEND_TEST(KuduScanNodeTest, TestPushTwoPredicatesOnNonMaterializedColumn);
   friend class KuduScanner;
 
-  kudu::client::KuduClient* kudu_client() {
-    return client_.get();
-  }
-
-  kudu::client::KuduTable* kudu_table() {
-    return table_.get();
-  }
+  kudu::client::KuduClient* kudu_client() { return client_.get(); }
 
   /// Tuple id resolved in Prepare() to set tuple_desc_.
   const TupleId tuple_id_;
@@ -84,19 +64,15 @@ class KuduScanNode : public ScanNode {
   /// Descriptor of tuples read from Kudu table.
   const TupleDescriptor* tuple_desc_;
 
-  /// The list of Kudu columns to project for the scan, extracted from the TupleDescriptor
-  /// and translated into Kudu format, i.e. matching the case of the Kudu schema.
-  std::vector<std::string> projected_columns_;
-
   /// The Kudu client and table. Scanners share these instances.
   std::tr1::shared_ptr<kudu::client::KuduClient> client_;
   std::tr1::shared_ptr<kudu::client::KuduTable> table_;
 
-  /// Set of ranges to be scanned.
-  std::vector<TKuduKeyRange> key_ranges_;
+  /// Set of scan tokens to be deserialized into Kudu scanners.
+  std::vector<std::string> scan_tokens_;
 
-  /// The next index in 'key_ranges_' to be assigned to a scanner.
-  int next_scan_range_idx_;
+  /// The next index in 'scan_tokens_' to be assigned. Protected by lock_.
+  int next_scan_token_idx_;
 
   // Outgoing row batches queue. Row batches are produced asynchronously by the scanner
   // threads and consumed by the main thread.
@@ -106,8 +82,8 @@ class KuduScanNode : public ScanNode {
   /// 'num_active_scanners_'.
   boost::mutex lock_;
 
-  /// The current status of the scan, set to non-OK if any problems occur, e.g. if an error
-  /// occurs in a scanner.
+  /// The current status of the scan, set to non-OK if any problems occur, e.g. if an
+  /// error occurs in a scanner.
   /// Protected by lock_
   Status status_;
 
@@ -115,14 +91,11 @@ class KuduScanNode : public ScanNode {
   /// Protected by lock_
   int num_active_scanners_;
 
-  /// Set to true when the scan is complete (either because all ranges were scanned, the limit
-  /// was reached or some error occurred).
+  /// Set to true when the scan is complete (either because all scan tokens have been
+  /// processed, the limit was reached or some error occurred).
   /// Protected by lock_
   volatile bool done_;
 
-  /// Maximum size of materialized_row_batches_.
-  int max_materialized_row_batches_;
-
   /// Thread group for all scanner worker threads
   ThreadGroup scanner_threads_;
 
@@ -131,55 +104,29 @@ class KuduScanNode : public ScanNode {
   static const std::string KUDU_READ_TIMER;
   static const std::string KUDU_ROUND_TRIPS;
 
-  /// The function names of the supported predicates.
-  static const std::string GE_FN;
-  static const std::string LE_FN;
-  static const std::string EQ_FN;
-
-  /// The set of conjuncts, in TExpr form, to be pushed to Kudu.
-  /// Received in TPlanNode::kudu_scan_node.
-  const std::vector<TExpr> pushable_conjuncts_;
-
-  /// The id of the callback added to the thread resource manager when thread token
+  /// The id of the callback added to the thread resource manager when a thread
   /// is available. Used to remove the callback before this scan node is destroyed.
   /// -1 if no callback is registered.
   int thread_avail_cb_id_;
 
-  /// The set of conjuncts, in KuduPredicate form, to be pushed to Kudu.
-  /// Derived from 'pushable_conjuncts_'.
-  std::vector<kudu::client::KuduPredicate*> kudu_predicates_;
-
-  /// Returns a KuduValue with 'type' built from a literal in 'node'.
-  /// Expects that 'node' is a literal value.
-  Status GetExprLiteralBound(const TExprNode& node,
-      kudu::client::KuduColumnSchema::DataType type, kudu::client::KuduValue** value);
-
-  /// Returns a string with the name of the column that 'node' refers to.
-  void GetSlotRefColumnName(const TExprNode& node, string* col_name);
-
-  /// Transforms 'pushable_conjuncts_' received from the frontend into 'kudu_predicates_' that will
-  /// be set in all scanners.
-  Status TransformPushableConjunctsToRangePredicates();
-
   /// Called when scanner threads are available for this scan node. This will
   /// try to spin up as many scanner threads as the quota allows.
-  void ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool);
+  void ThreadAvailableCb(ThreadResourceMgr::ResourcePool* pool);
 
   /// Main function for scanner thread which executes a KuduScanner. Begins by processing
-  /// 'initial_range', and continues processing ranges returned by 'GetNextKeyRange()'
-  /// until there are no more ranges, an error occurs, or the limit is reached.
-  void ScannerThread(const string& name, const TKuduKeyRange* initial_range);
-
-  /// Processes a single scan range. Row batches are fetched using 'scanner' and enqueued
-  /// in 'materialized_row_batches_' until the scanner reports eos for 'key_range', an
-  /// error occurs, or the limit is reached.
-  Status ProcessRange(KuduScanner* scanner, const TKuduKeyRange* key_range);
+  /// 'initial_token', and continues processing scan tokens returned by
+  /// 'GetNextScanToken()' until there are none left, an error occurs, or the limit is
+  /// reached.
+  void RunScannerThread(const std::string& name, const std::string* initial_token);
 
-  /// Returns the next partition key range to read. Thread safe. Returns NULL if there are
-  /// no more ranges.
-  TKuduKeyRange* GetNextKeyRange();
+  /// Processes a single scan token. Row batches are fetched using 'scanner' and enqueued
+  /// in 'materialized_row_batches_' until the scanner reports eos, an error occurs, or
+  /// the limit is reached.
+  Status ProcessScanToken(KuduScanner* scanner, const std::string& scan_token);
 
-  const std::vector<std::string>& projected_columns() const { return projected_columns_; }
+  /// Returns the next scan token. Thread safe. Returns NULL if there are no more scan
+  /// tokens.
+  const std::string* GetNextScanToken();
 
   const TupleDescriptor* tuple_desc() const { return tuple_desc_; }
 
@@ -187,9 +134,6 @@ class KuduScanNode : public ScanNode {
   // have been open previously.
   Status GetConjunctCtxs(vector<ExprContext*>* ctxs);
 
-  // Clones the set of predicates to be set on scanners.
-  void ClonePredicates(vector<kudu::client::KuduPredicate*>* predicates);
-
   RuntimeProfile::Counter* kudu_read_timer() const { return kudu_read_timer_; }
   RuntimeProfile::Counter* kudu_round_trips() const { return kudu_round_trips_; }
 };

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/157c8005/be/src/exec/kudu-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc
index 66e17fa..57ba356 100644
--- a/be/src/exec/kudu-scanner.cc
+++ b/be/src/exec/kudu-scanner.cc
@@ -39,8 +39,6 @@
 #include "common/names.h"
 
 using kudu::client::KuduClient;
-using kudu::client::KuduColumnSchema;
-using kudu::client::KuduPredicate;
 using kudu::client::KuduScanBatch;
 using kudu::client::KuduSchema;
 using kudu::client::KuduTable;
@@ -57,40 +55,15 @@ DEFINE_int32(kudu_scanner_timeout_sec, 60,
 
 namespace impala {
 
-namespace {
-
-// Sets up the scan range predicate on the scanner, i.e. the partition start/stop keys
-// of the partition the scanner is supposed to scan.
-Status SetupScanRangePredicate(const TKuduKeyRange& key_range,
-    kudu::client::KuduScanner* scanner) {
-  if (key_range.range_start_key.empty() && key_range.range_stop_key.empty()) {
-    return Status::OK();
-  }
-
-  if (!key_range.range_start_key.empty()) {
-    KUDU_RETURN_IF_ERROR(scanner->AddLowerBoundPartitionKeyRaw(
-            key_range.range_start_key), "adding scan range lower bound");
-  }
-  if (!key_range.range_stop_key.empty()) {
-    KUDU_RETURN_IF_ERROR(scanner->AddExclusiveUpperBoundPartitionKeyRaw(
-            key_range.range_stop_key), "adding scan range upper bound");
-  }
-  return Status::OK();
-}
-
-} // anonymous namespace
-
 KuduScanner::KuduScanner(KuduScanNode* scan_node, RuntimeState* state)
   : scan_node_(scan_node),
     state_(state),
-    rows_scanned_current_block_(0),
-    last_alive_time_micros_(0) {
+    cur_kudu_batch_num_read_(0),
+    last_alive_time_micros_(0),
+    tuple_num_null_bytes_(scan_node_->tuple_desc()->num_null_bytes()) {
 }
 
 Status KuduScanner::Open() {
-  tuple_byte_size_ = scan_node_->tuple_desc()->byte_size();
-  tuple_num_null_bytes_ = scan_node_->tuple_desc()->num_null_bytes();
-
   // Store columns that need relocation when materialized into the
   // destination row batch.
   for (int i = 0; i < scan_node_->tuple_desc_->slots().size(); ++i) {
@@ -129,60 +102,48 @@ Status KuduScanner::GetNext(RowBatch* row_batch, bool* eos) {
   Tuple* tuple = reinterpret_cast<Tuple*>(tuple_buffer);
 
   // Main scan loop:
-  // Tries to fill 'row_batch' with rows from the last fetched block.
-  // If there are no rows to decode tries to get the next block from kudu.
-  // If there are no more blocks in the current range tries to get the next range.
-  // If there aren't any more rows, blocks or ranges then we're done.
-  while(true) {
+  // Tries to fill 'row_batch' with rows from cur_kudu_batch_.
+  // If there are no rows to decode, tries to get the next row batch from kudu.
+  // If this scanner has no more rows, the scanner is closed and eos is returned.
+  while (!*eos) {
     RETURN_IF_CANCELLED(state_);
-    // If the last fetched block has more rows, decode and if we filled up the batch
-    // return.
-    if (CurrentBlockHasMoreRows()) {
+
+    if (cur_kudu_batch_num_read_ < cur_kudu_batch_.NumRows()) {
       bool batch_done = false;
       RETURN_IF_ERROR(DecodeRowsIntoRowBatch(row_batch, &tuple, &batch_done));
-      if (batch_done) return Status::OK();
+      if (batch_done) break;
     }
-    // If the current scanner has more blocks, fetch them.
-    if (CurrentRangeHasMoreBlocks()) {
-      RETURN_IF_ERROR(GetNextBlock());
+
+    if (scanner_->HasMoreRows()) {
+      RETURN_IF_ERROR(GetNextScannerBatch());
       continue;
     }
-    // No more blocks in the current scanner, close it.
-    CloseCurrentRange();
-    // No more rows or blocks, we're done.
+
+    CloseCurrentClientScanner();
     *eos = true;
-    return Status::OK();
   }
   return Status::OK();
 }
 
 void KuduScanner::Close() {
-  if (scanner_) CloseCurrentRange();
+  if (scanner_) CloseCurrentClientScanner();
   Expr::Close(conjunct_ctxs_, state_);
 }
 
-Status KuduScanner::OpenNextRange(const TKuduKeyRange& key_range)  {
+Status KuduScanner::OpenNextScanToken(const string& scan_token)  {
   DCHECK(scanner_ == NULL);
-  scanner_.reset(new kudu::client::KuduScanner(scan_node_->kudu_table()));
-  KUDU_RETURN_IF_ERROR(scanner_->SetProjectedColumns(scan_node_->projected_columns()),
-      "Unable to set projected columns");
-
-  RETURN_IF_ERROR(SetupScanRangePredicate(key_range, scanner_.get()));
-
-  vector<KuduPredicate*> predicates;
-  scan_node_->ClonePredicates(&predicates);
-  for (KuduPredicate* predicate: predicates) {
-    KUDU_RETURN_IF_ERROR(scanner_->AddConjunctPredicate(predicate),
-                         "Unable to add conjunct predicate.");
-  }
+  kudu::client::KuduScanner* scanner;
+  KUDU_RETURN_IF_ERROR(kudu::client::KuduScanToken::DeserializeIntoScanner(
+      scan_node_->kudu_client(), scan_token, &scanner),
+      "Unable to deserialize scan token");
+  scanner_.reset(scanner);
 
   if (UNLIKELY(FLAGS_pick_only_leaders_for_tests)) {
     KUDU_RETURN_IF_ERROR(scanner_->SetSelection(kudu::client::KuduClient::LEADER_ONLY),
                          "Could not set replica selection.");
   }
 
-  KUDU_RETURN_IF_ERROR(scanner_->SetTimeoutMillis(
-      FLAGS_kudu_scanner_timeout_sec * 1000),
+  KUDU_RETURN_IF_ERROR(scanner_->SetTimeoutMillis(FLAGS_kudu_scanner_timeout_sec * 1000),
       "Could not set scanner timeout");
 
   {
@@ -192,17 +153,17 @@ Status KuduScanner::OpenNextRange(const TKuduKeyRange& key_range)  {
   return Status::OK();
 }
 
-void KuduScanner::CloseCurrentRange() {
+void KuduScanner::CloseCurrentClientScanner() {
   DCHECK_NOTNULL(scanner_.get());
   scanner_->Close();
   scanner_.reset();
 }
 
 Status KuduScanner::HandleEmptyProjection(RowBatch* row_batch, bool* batch_done) {
-  int rem_in_block = cur_kudu_batch_.NumRows() - rows_scanned_current_block_;
+  int num_rows_remaining = cur_kudu_batch_.NumRows() - cur_kudu_batch_num_read_;
   int rows_to_add = std::min(row_batch->capacity() - row_batch->num_rows(),
-      rem_in_block);
-  rows_scanned_current_block_ += rows_to_add;
+      num_rows_remaining);
+  cur_kudu_batch_num_read_ += rows_to_add;
   row_batch->CommitRows(rows_to_add);
   // If we've reached the capacity, or the LIMIT for the scan, return.
   if (row_batch->AtCapacity() || scan_node_->ReachedLimit()) {
@@ -228,14 +189,14 @@ Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch,
 
   int num_rows = cur_kudu_batch_.NumRows();
   // Now iterate through the Kudu rows.
-  for (int krow_idx = rows_scanned_current_block_; krow_idx < num_rows; ++krow_idx) {
+  for (int krow_idx = cur_kudu_batch_num_read_; krow_idx < num_rows; ++krow_idx) {
     // Clear any NULL indicators set by a previous iteration.
     (*tuple_mem)->Init(tuple_num_null_bytes_);
 
     // Transform a Kudu row into an Impala row.
     KuduScanBatch::RowPtr krow = cur_kudu_batch_.Row(krow_idx);
     RETURN_IF_ERROR(KuduRowToImpalaTuple(krow, row_batch, *tuple_mem));
-    ++rows_scanned_current_block_;
+    ++cur_kudu_batch_num_read_;
 
     // Evaluate the conjuncts that haven't been pushed down to Kudu.
     if (conjunct_ctxs_.empty() ||
@@ -373,12 +334,12 @@ Status KuduScanner::KuduRowToImpalaTuple(const KuduScanBatch::RowPtr& row,
 }
 
 
-Status KuduScanner::GetNextBlock() {
+Status KuduScanner::GetNextScannerBatch() {
   SCOPED_TIMER(scan_node_->kudu_read_timer());
   int64_t now = MonotonicMicros();
   KUDU_RETURN_IF_ERROR(scanner_->NextBatch(&cur_kudu_batch_), "Unable to advance iterator");
   COUNTER_ADD(scan_node_->kudu_round_trips(), 1);
-  rows_scanned_current_block_ = 0;
+  cur_kudu_batch_num_read_ = 0;
   COUNTER_ADD(scan_node_->rows_read_counter(), cur_kudu_batch_.NumRows());
   last_alive_time_micros_ = now;
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/157c8005/be/src/exec/kudu-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scanner.h b/be/src/exec/kudu-scanner.h
index beddd05..b31bb73 100644
--- a/be/src/exec/kudu-scanner.h
+++ b/be/src/exec/kudu-scanner.h
@@ -31,7 +31,9 @@ class RowBatch;
 class RuntimeState;
 class Tuple;
 
-/// Executes scans in Kudu based on the provided set of scan ranges.
+/// Wraps a Kudu client scanner to fetch row batches from Kudu. The Kudu client scanner
+/// is created from a scan token in OpenNextScanToken(), which then provides rows fetched
+/// by GetNext() until it reaches eos, and the caller may open another scan token.
 class KuduScanner {
  public:
   KuduScanner(KuduScanNode* scan_node, RuntimeState* state);
@@ -40,17 +42,17 @@ class KuduScanner {
   /// Does not actually open a kudu::client::KuduScanner.
   Status Open();
 
-  /// Opens a new kudu::client::KuduScanner to scan 'range'.
-  Status OpenNextRange(const TKuduKeyRange& range);
+  /// Opens a new kudu::client::KuduScanner using 'scan_token'.
+  Status OpenNextScanToken(const std::string& scan_token);
 
   /// Fetches the next batch from the current kudu::client::KuduScanner.
   Status GetNext(RowBatch* row_batch, bool* eos);
 
-  /// Sends a "Ping" to the Kudu TabletServer servicing the current scan, if there is one.
-  /// This serves the purpose of making the TabletServer keep the server side scanner alive
-  /// if the batch queue is full and no batches can be queued. If there are any errors,
-  /// they are ignored here, since we assume that we will just fail the next time we
-  /// try to read a batch.
+  /// Sends a "Ping" to the Kudu TabletServer servicing the current scan, if there is
+  /// one. This serves the purpose of making the TabletServer keep the server side
+  /// scanner alive if the batch queue is full and no batches can be queued. If there are
+  /// any errors, they are ignored here, since we assume that we will just fail the next
+  /// time we try to read a batch.
   void KeepKuduScannerAlive();
 
   /// Closes this scanner.
@@ -67,32 +69,22 @@ class KuduScanner {
   /// Returns true if 'slot' is Null in 'tuple'.
   bool IsSlotNull(Tuple* tuple, const SlotDescriptor& slot);
 
-  /// Returns true if the current block hasn't been fully scanned.
-  bool CurrentBlockHasMoreRows() {
-    return rows_scanned_current_block_ < cur_kudu_batch_.NumRows();
-  }
-
   /// Decodes rows previously fetched from kudu, now in 'cur_rows_' into a RowBatch.
   ///  - 'batch' is the batch that will point to the new tuples.
   ///  - *tuple_mem should be the location to output tuples.
-  ///  - Sets 'batch_done' to true to indicate that the batch was filled to capacity or the limit
-  ///    was reached.
+  ///  - Sets 'batch_done' to true to indicate that the batch was filled to capacity or
+  ///    the limit was reached.
   Status DecodeRowsIntoRowBatch(RowBatch* batch, Tuple** tuple_mem, bool* batch_done);
 
-  /// Returns true of the current kudu::client::KuduScanner has more rows.
-  bool CurrentRangeHasMoreBlocks() {
-    return scanner_->HasMoreRows();
-  }
-
-  /// Fetches the next block of the current kudu::client::KuduScanner.
-  Status GetNextBlock();
+  /// Fetches the next batch of rows from the current kudu::client::KuduScanner.
+  Status GetNextScannerBatch();
 
   /// Closes the current kudu::client::KuduScanner.
-  void CloseCurrentRange();
+  void CloseCurrentClientScanner();
 
-  /// Given a tuple, copies the values of those columns that require additional memory from memory
-  /// owned by the kudu::client::KuduScanner into memory owned by the RowBatch. Assumes that the
-  /// other columns are already materialized.
+  /// Given a tuple, copies the values of those columns that require additional memory
+  /// from memory owned by the kudu::client::KuduScanner into memory owned by the
+  /// RowBatch. Assumes that the other columns are already materialized.
   Status RelocateValuesFromKudu(Tuple* tuple, MemPool* mem_pool);
 
   /// Transforms a kudu row into an Impala row. Columns that don't require auxiliary
@@ -113,16 +105,15 @@ class KuduScanner {
   KuduScanNode* scan_node_;
   RuntimeState* state_;
 
-  /// The set of key ranges being serviced by this scanner.
-  const vector<TKuduKeyRange> scan_ranges_;
-
-  /// The kudu::client::KuduScanner for the current range.
-  /// One such scanner is required per range as per-range start/stop keys can only be set once.
+  /// The kudu::client::KuduScanner for the current scan token. A new KuduScanner is
+  /// created for each scan token using KuduScanToken::DeserializeIntoScanner().
   boost::scoped_ptr<kudu::client::KuduScanner> scanner_;
 
   /// The current batch of retrieved rows.
   kudu::client::KuduScanBatch cur_kudu_batch_;
-  int rows_scanned_current_block_;
+
+  /// The number of rows already read from cur_kudu_batch_.
+  int cur_kudu_batch_num_read_;
 
   /// The last time a keepalive request or successful RPC was sent.
   int64_t last_alive_time_micros_;
@@ -130,11 +121,6 @@ class KuduScanner {
   /// The scanner's cloned copy of the conjuncts to apply.
   vector<ExprContext*> conjunct_ctxs_;
 
-  std::vector<std::string> projected_columns_;
-
-  /// Size of the materialized tuple in the row batch.
-  int tuple_byte_size_;
-
   /// Number of bytes needed to represent the null bits in the tuple.
   int tuple_num_null_bytes_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/157c8005/be/src/exec/kudu-util.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-util.cc b/be/src/exec/kudu-util.cc
index 8edf1cb..aedf229 100644
--- a/be/src/exec/kudu-util.cc
+++ b/be/src/exec/kudu-util.cc
@@ -17,19 +17,16 @@
 
 #include "exec/kudu-util.h"
 
-#include <algorithm>
+#include <string>
+#include <sstream>
 
-#include <boost/algorithm/string.hpp>
-#include <boost/unordered_set.hpp>
 #include <kudu/client/callbacks.h>
 #include <kudu/client/schema.h>
 
-#include "runtime/descriptors.h"
-#include "gutil/strings/substitute.h"
-
+#include "common/logging.h"
 #include "common/names.h"
+#include "common/status.h"
 
-using boost::algorithm::to_lower_copy;
 using kudu::client::KuduSchema;
 using kudu::client::KuduColumnSchema;
 
@@ -57,122 +54,13 @@ Status CheckKuduAvailability() {
   return Status(TErrorCode::KUDU_NOT_SUPPORTED_ON_OS);
 }
 
-Status ImpalaToKuduType(const ColumnType& impala_type,
-    kudu::client::KuduColumnSchema::DataType* kudu_type) {
-  using kudu::client::KuduColumnSchema;
-
-  switch (impala_type.type) {
-    case TYPE_VARCHAR:
-    case TYPE_STRING:
-      *kudu_type = KuduColumnSchema::STRING;
-      break;
-    case TYPE_TINYINT:
-      *kudu_type = KuduColumnSchema::INT8;
-      break;
-    case TYPE_SMALLINT:
-      *kudu_type = KuduColumnSchema::INT16;
-      break;
-    case TYPE_INT:
-      *kudu_type = KuduColumnSchema::INT32;
-      break;
-    case TYPE_BIGINT:
-      *kudu_type = KuduColumnSchema::INT64;
-      break;
-    case TYPE_FLOAT:
-      *kudu_type = KuduColumnSchema::FLOAT;
-      break;
-    case TYPE_DOUBLE:
-      *kudu_type = KuduColumnSchema::DOUBLE;
-      break;
-    case TYPE_BOOLEAN:
-      *kudu_type = KuduColumnSchema::BOOL;
-      break;
-    default:
-      return Status(TErrorCode::IMPALA_KUDU_TYPE_MISSING, TypeToString(impala_type.type));
-  }
-  return Status::OK();
-}
-
-Status KuduToImpalaType(const kudu::client::KuduColumnSchema::DataType& kudu_type,
-    ColumnType* impala_type) {
-  using kudu::client::KuduColumnSchema;
-
-  switch (kudu_type) {
-    case KuduColumnSchema::STRING:
-      *impala_type = TYPE_STRING;
-      break;
-    case KuduColumnSchema::INT8:
-      *impala_type = TYPE_TINYINT;
-      break;
-    case KuduColumnSchema::INT16:
-      *impala_type = TYPE_SMALLINT;
-      break;
-    case KuduColumnSchema::INT32:
-      *impala_type = TYPE_INT;
-      break;
-    case KuduColumnSchema::INT64:
-      *impala_type = TYPE_BIGINT;
-      break;
-    case KuduColumnSchema::FLOAT:
-      *impala_type = TYPE_FLOAT;
-      break;
-    case KuduColumnSchema::DOUBLE:
-      *impala_type = TYPE_DOUBLE;
-      break;
-    default:
-      return Status(TErrorCode::KUDU_IMPALA_TYPE_MISSING,
-                    KuduColumnSchema::DataTypeToString(kudu_type));
+string KuduSchemaDebugString(const KuduSchema& schema) {
+  stringstream ss;
+  for (int i = 0; i < schema.num_columns(); ++i) {
+    const KuduColumnSchema& col = schema.Column(i);
+    ss << col.name() << " " << KuduColumnSchema::DataTypeToString(col.type()) << "\n";
   }
-  return Status::OK();
-}
-
-/// Returns a map of lower case column names to column indexes in 'map'.
-/// Returns an error Status if 'schema' had more than one column with the same lower
-/// case name.
-Status MapLowercaseKuduColumnNamesToIndexes(const kudu::client::KuduSchema& schema,
-    IdxByLowerCaseColName* map) {
-  DCHECK(map != NULL);
-  for(size_t i = 0; i < schema.num_columns(); ++i) {
-    string lower_case_col_name = to_lower_copy(schema.Column(i).name());
-    if (map->find(lower_case_col_name) != map->end()) {
-      return Status(strings::Substitute("There was already a column with name: '$0' "
-          "in the schema", lower_case_col_name));
-    }
-    (*map)[lower_case_col_name] = i;
-  }
-  return Status::OK();
-}
-
-Status ProjectedColumnsFromTupleDescriptor(const TupleDescriptor& tuple_desc,
-    vector<string>* projected_columns, const KuduSchema& schema) {
-  DCHECK(projected_columns != NULL);
-  projected_columns->clear();
-
-  IdxByLowerCaseColName idx_by_lc_name;
-  RETURN_IF_ERROR(MapLowercaseKuduColumnNamesToIndexes(schema, &idx_by_lc_name));
-
-  // In debug mode try a dynamic cast. If it fails it means that the
-  // TableDescriptor is not an instance of KuduTableDescriptor.
-  DCHECK(dynamic_cast<const KuduTableDescriptor*>(tuple_desc.table_desc()) != NULL)
-      << "TableDescriptor must be an instance KuduTableDescriptor.";
-
-  const KuduTableDescriptor* table_desc =
-      static_cast<const KuduTableDescriptor*>(tuple_desc.table_desc());
-  LOG(INFO) << "Table desc for schema: " << table_desc->DebugString();
-
-  const std::vector<SlotDescriptor*>& slots = tuple_desc.slots();
-  for (int i = 0; i < slots.size(); ++i) {
-    int col_idx = slots[i]->col_pos();
-    string impala_col_name = to_lower_copy(table_desc->col_descs()[col_idx].name());
-    IdxByLowerCaseColName::const_iterator iter = idx_by_lc_name.find(impala_col_name);
-    if (iter == idx_by_lc_name.end()) {
-      return Status(strings::Substitute("Could not find column: $0 in the Kudu schema.",
-         impala_col_name));
-    }
-    projected_columns->push_back(schema.Column((*iter).second).name());
-  }
-
-  return Status::OK();
+  return ss.str();
 }
 
 void LogKuduMessage(void* unused, kudu::client::KuduLogSeverity severity,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/157c8005/be/src/exec/kudu-util.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-util.h b/be/src/exec/kudu-util.h
index fd779cb..7957f7a 100644
--- a/be/src/exec/kudu-util.h
+++ b/be/src/exec/kudu-util.h
@@ -21,15 +21,9 @@
 #include <kudu/client/callbacks.h>
 #include <kudu/client/client.h>
 
-#include <boost/unordered_map.hpp>
-
 namespace impala {
 
-class TExpr;
-class KuduTableDescriptor;
 class Status;
-class TupleDescriptor;
-struct ColumnType;
 
 /// Returns false when running on an operating system that Kudu doesn't support. If this
 /// check fails, there is no way Kudu should be expected to work. Exposed for testing.
@@ -43,25 +37,8 @@ Status CheckKuduAvailability();
 /// Convenience function for the bool equivalent of CheckKuduAvailability().
 bool KuduIsAvailable();
 
-Status ImpalaToKuduType(const ColumnType& impala_type,
-    kudu::client::KuduColumnSchema::DataType* kudu_type);
-
-Status KuduToImpalaType(const kudu::client::KuduColumnSchema::DataType& kudu_type,
-    ColumnType* impala_type);
-
-typedef boost::unordered_map<std::string, int> IdxByLowerCaseColName;
-
-/// Returns a map of lower case column names to column indexes in 'map'.
-/// Returns an error Status if 'schema' had more than one column with the same lower
-/// case name.
-Status MapLowercaseKuduColumnNamesToIndexes(const kudu::client::KuduSchema& schema,
-    IdxByLowerCaseColName* map);
-
-/// Gets the projected columns from the TupleDescriptor.
-/// Translates Impala's lower case column names to the version used by Kudu.
-/// 'projected_columns' is expected to be not NULL and will be cleared.
-Status ProjectedColumnsFromTupleDescriptor(const TupleDescriptor& tuple_desc,
-    std::vector<std::string>* projected_columns, const kudu::client::KuduSchema& schema);
+/// Returns a debug string for the KuduSchema.
+std::string KuduSchemaDebugString(const kudu::client::KuduSchema& schema);
 
 /// Initializes Kudu's logging by binding a callback that logs back to Impala's glog. This
 /// also sets Kudu's verbose logging to whatever level is set in Impala.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/157c8005/be/src/scheduling/simple-scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.cc b/be/src/scheduling/simple-scheduler.cc
index 9a865f0..f4b90cc 100644
--- a/be/src/scheduling/simple-scheduler.cc
+++ b/be/src/scheduling/simple-scheduler.cc
@@ -970,7 +970,7 @@ void SimpleScheduler::AssignmentCtx::RecordScanRangeAssignment(
   int64_t scan_range_length = 0;
   if (scan_range_locations.scan_range.__isset.hdfs_file_split) {
     scan_range_length = scan_range_locations.scan_range.hdfs_file_split.length;
-  } else if (scan_range_locations.scan_range.__isset.kudu_key_range) {
+  } else if (scan_range_locations.scan_range.__isset.kudu_scan_token) {
     // Hack so that kudu ranges are well distributed.
     // TODO: KUDU-1133 Use the tablet size instead.
     scan_range_length = 1000;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/157c8005/bin/impala-config.sh
----------------------------------------------------------------------
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index 7f4df7f..39750c0 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -285,7 +285,7 @@ export KUDU_MASTER_PORT="${KUDU_MASTER_PORT:-7051}"
 # TODO: Figure out a way to use a snapshot version without causing a lot of breakage due
 #       to nightly changes from Kudu. The version below is the last released version but
 #       before release this needs to be updated to the version about to be released.
-export KUDU_JAVA_VERSION=0.10.0-SNAPSHOT
+export KUDU_JAVA_VERSION=1.0.0-SNAPSHOT
 
 if [[ $OSTYPE == "darwin"* ]]; then
   IMPALA_CYRUS_SASL_VERSION=2.1.26

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/157c8005/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 3e31120..0219bcd 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -173,19 +173,13 @@ struct THBaseKeyRange {
   2: optional string stopKey
 }
 
-// Key range to query a Kudu table
-struct TKuduKeyRange {
-  1: optional binary range_start_key
-  2: optional binary range_stop_key
-}
-
 // Specification of an individual data range which is held in its entirety
 // by a storage server
 struct TScanRange {
-  // one of these must be set for every TScanRange2
+  // one of these must be set for every TScanRange
   1: optional THdfsFileSplit hdfs_file_split
   2: optional THBaseKeyRange hbase_key_range
-  3: optional TKuduKeyRange kudu_key_range
+  3: optional binary kudu_scan_token
 }
 
 struct THdfsScanNode {
@@ -247,9 +241,6 @@ struct THBaseScanNode {
 
 struct TKuduScanNode {
   1: required Types.TTupleId tuple_id
-
-  // List of conjuncts that can be pushed down to Kudu.
-  2: optional list<Exprs.TExpr> kudu_conjuncts
 }
 
 struct TEqJoinCondition {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/157c8005/fe/pom.xml
----------------------------------------------------------------------
diff --git a/fe/pom.xml b/fe/pom.xml
index 3870262..f7bffb9 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -272,7 +272,7 @@ under the License.
     </dependency>
 
     <dependency>
-      <groupId>org.kududb</groupId>
+      <groupId>org.apache.kudu</groupId>
       <artifactId>kudu-client</artifactId>
       <version>${kudu.version}</version>
     </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/157c8005/fe/src/main/java/com/cloudera/impala/analysis/Expr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/analysis/Expr.java b/fe/src/main/java/com/cloudera/impala/analysis/Expr.java
index 91d240a..fdc5bf1 100644
--- a/fe/src/main/java/com/cloudera/impala/analysis/Expr.java
+++ b/fe/src/main/java/com/cloudera/impala/analysis/Expr.java
@@ -1190,8 +1190,8 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
   }
 
   /**
-   * For children of 'this' that are constant expressions capable of being expressed
-   * as LiteralExprs, evaluate them in the BE and substitute the child with the
+   * For children of 'this' that are constant expressions and the type of which has a
+   * LiteralExpr subclass, evaluate them in the BE and substitute the child with the
    * resulting LiteralExpr. Modifies 'this' in place and does not re-analyze it. Hence,
    * it is not safe to evaluate the modified expr in the BE as the resolved fn_ may be
    * incorrect given the new arguments.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/157c8005/fe/src/main/java/com/cloudera/impala/analysis/LiteralExpr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/analysis/LiteralExpr.java b/fe/src/main/java/com/cloudera/impala/analysis/LiteralExpr.java
index d387b54..f5eedbb 100644
--- a/fe/src/main/java/com/cloudera/impala/analysis/LiteralExpr.java
+++ b/fe/src/main/java/com/cloudera/impala/analysis/LiteralExpr.java
@@ -49,8 +49,8 @@ public abstract class LiteralExpr extends Expr implements Comparable<LiteralExpr
   }
 
   /**
-   * Returns an analyzed literal of 'type'. Returns null for types that cannot be
-   * expressed as literals, e.g. TIMESTAMP.
+   * Returns an analyzed literal of 'type'. Returns null for types that do not have a
+   * LiteralExpr subclass, e.g. TIMESTAMP.
    */
   public static LiteralExpr create(String value, Type type) throws AnalysisException {
     Preconditions.checkArgument(type.isValid());
@@ -151,7 +151,7 @@ public abstract class LiteralExpr extends Expr implements Comparable<LiteralExpr
   /**
    * Evaluates the given constant expr and returns its result as a LiteralExpr.
    * Assumes expr has been analyzed. Returns constExpr if is it already a LiteralExpr.
-   * Returns null for types that cannot be expressed as literals, e.g. TIMESTAMP.
+   * Returns null for types that do not have a LiteralExpr subclass, e.g. TIMESTAMP.
    * TODO: Support non-scalar types.
    */
   public static LiteralExpr create(Expr constExpr, TQueryCtx queryCtx)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/157c8005/fe/src/main/java/com/cloudera/impala/catalog/KuduTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/KuduTable.java b/fe/src/main/java/com/cloudera/impala/catalog/KuduTable.java
index 28f4133..71d897d 100644
--- a/fe/src/main/java/com/cloudera/impala/catalog/KuduTable.java
+++ b/fe/src/main/java/com/cloudera/impala/catalog/KuduTable.java
@@ -28,8 +28,8 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.log4j.Logger;
-import org.kududb.client.KuduClient;
-import org.kududb.client.LocatedTablet;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.LocatedTablet;
 
 import com.cloudera.impala.common.ImpalaRuntimeException;
 import com.cloudera.impala.thrift.TCatalogObjectType;
@@ -248,7 +248,7 @@ public class KuduTable extends Table {
 
     try (KuduClient client = new KuduClient.KuduClientBuilder(
         getKuduMasterAddresses()).build()) {
-      org.kududb.client.KuduTable kuduTable = client.openTable(kuduTableName_);
+      org.apache.kudu.client.KuduTable kuduTable = client.openTable(kuduTableName_);
       List<LocatedTablet> tablets =
           kuduTable.getTabletsLocations(KUDU_RPC_TIMEOUT_MS);
       for (LocatedTablet tab: tablets) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/157c8005/fe/src/main/java/com/cloudera/impala/catalog/delegates/KuduDdlDelegate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/delegates/KuduDdlDelegate.java b/fe/src/main/java/com/cloudera/impala/catalog/delegates/KuduDdlDelegate.java
index 919eebb..ecfeb1a 100644
--- a/fe/src/main/java/com/cloudera/impala/catalog/delegates/KuduDdlDelegate.java
+++ b/fe/src/main/java/com/cloudera/impala/catalog/delegates/KuduDdlDelegate.java
@@ -28,13 +28,13 @@ import java.util.List;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.kududb.ColumnSchema;
-import org.kududb.ColumnSchema.ColumnSchemaBuilder;
-import org.kududb.Schema;
-import org.kududb.Type;
-import org.kududb.client.CreateTableOptions;
-import org.kududb.client.KuduClient;
-import org.kududb.client.PartialRow;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.PartialRow;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -85,7 +85,7 @@ public class KuduDdlDelegate extends DdlDelegate {
         }
 
         // Check if the external table matches the schema
-        org.kududb.client.KuduTable kuduTable = client.openTable(kuduTableName);
+        org.apache.kudu.client.KuduTable kuduTable = client.openTable(kuduTableName);
         if (!compareSchema(msTbl_, kuduTable)) {
           throw new ImpalaRuntimeException(String.format(
               "Table %s (%s) has a different schema in Kudu than in Hive.",