You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2017/05/02 15:00:27 UTC

[1/2] incubator-impala git commit: IMPALA-3742: Partitions and sort INSERTs for Kudu tables

Repository: incubator-impala
Updated Branches:
  refs/heads/master 77304530f -> 8b459dffe


IMPALA-3742: Partitions and sort INSERTs for Kudu tables

Bulk DMLs (INSERT, UPSERT, UPDATE, and DELETE) for Kudu
are currently painful because we just send rows randomly,
which creates a lot of work for Kudu since it partitions
and sorts data before writing, causing writes to be slow
and leading to timeouts.

We can alleviate this by sending the rows to Kudu already
partitioned and sorted. This patch partitions and sorts
rows according to Kudu's partitioning scheme for INSERTs
and UPSERTs. A followup patch will handle UPDATE and DELETE.

It accomplishes this by inserting an exchange node and a sort
node into the plan before the operation. Both the exchange and
the sort are given a KuduPartitionExpr which takes a row and
calls into the Kudu client to return its partition number.

It also disallows INSERT hints for Kudu tables, since the
hints that we support (SHUFFLE, CLUSTER, SORTBY), so longer
make sense.

Testing:
- Updated planner tests.
- Ran the Kudu functional tests.
- Ran performance tests demonstrating that we can now handle much
  larger inserts without having timeouts.

Change-Id: I84ce0032a1b10958fdf31faef225372c5c38fdc4
Reviewed-on: http://gerrit.cloudera.org:8080/6559
Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/801c95f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/801c95f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/801c95f3

Branch: refs/heads/master
Commit: 801c95f39f9de6c29380910274f97748ea8e47a9
Parents: 7730453
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Wed Apr 5 12:35:53 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue May 2 01:40:43 2017 +0000

----------------------------------------------------------------------
 be/src/exec/kudu-table-sink.cc                  |  48 +--------
 be/src/exec/kudu-util.cc                        |  52 +++++++++
 be/src/exec/kudu-util.h                         |   9 ++
 be/src/exprs/CMakeLists.txt                     |   1 +
 be/src/exprs/expr-context.h                     |   1 +
 be/src/exprs/expr.cc                            |   4 +
 be/src/exprs/kudu-partition-expr.cc             |  94 ++++++++++++++++
 be/src/exprs/kudu-partition-expr.h              |  62 +++++++++++
 be/src/runtime/coordinator.cc                   |  12 +++
 be/src/runtime/data-stream-sender.cc            |  42 ++++++--
 be/src/runtime/data-stream-sender.h             |   7 +-
 be/src/scheduling/scheduler.cc                  |   3 +-
 common/thrift/Exprs.thrift                      |  16 ++-
 common/thrift/Partitions.thrift                 |   8 +-
 .../org/apache/impala/analysis/InsertStmt.java  |  70 +++++++-----
 .../impala/analysis/KuduPartitionExpr.java      |  94 ++++++++++++++++
 .../org/apache/impala/catalog/KuduTable.java    |   9 ++
 .../apache/impala/planner/DataPartition.java    |   8 +-
 .../impala/planner/DistributedPlanner.java      |  22 +++-
 .../java/org/apache/impala/planner/Planner.java |  16 ++-
 .../org/apache/impala/planner/TableSink.java    |   2 -
 .../impala/analysis/AnalyzeStmtsTest.java       |  16 ++-
 .../impala/analysis/AnalyzeUpsertStmtTest.java  |   9 +-
 .../queries/PlannerTest/kudu-upsert.test        | 106 ++++++++++++++++++-
 .../queries/PlannerTest/kudu.test               |  39 ++++---
 .../queries/QueryTest/kudu_insert.test          |  36 -------
 26 files changed, 616 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/be/src/exec/kudu-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index 9b0085c..f09b832 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -251,53 +251,7 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) {
       }
 
       PrimitiveType type = output_expr_ctxs_[j]->root()->type().type;
-      switch (type) {
-        case TYPE_VARCHAR:
-        case TYPE_STRING: {
-          StringValue* sv = reinterpret_cast<StringValue*>(value);
-          kudu::Slice slice(reinterpret_cast<uint8_t*>(sv->ptr), sv->len);
-          KUDU_RETURN_IF_ERROR(write->mutable_row()->SetString(col, slice),
-              "Could not add Kudu WriteOp.");
-          break;
-        }
-        case TYPE_FLOAT:
-          KUDU_RETURN_IF_ERROR(
-              write->mutable_row()->SetFloat(col, *reinterpret_cast<float*>(value)),
-              "Could not add Kudu WriteOp.");
-          break;
-        case TYPE_DOUBLE:
-          KUDU_RETURN_IF_ERROR(
-              write->mutable_row()->SetDouble(col, *reinterpret_cast<double*>(value)),
-              "Could not add Kudu WriteOp.");
-          break;
-        case TYPE_BOOLEAN:
-          KUDU_RETURN_IF_ERROR(
-              write->mutable_row()->SetBool(col, *reinterpret_cast<bool*>(value)),
-              "Could not add Kudu WriteOp.");
-          break;
-        case TYPE_TINYINT:
-          KUDU_RETURN_IF_ERROR(
-              write->mutable_row()->SetInt8(col, *reinterpret_cast<int8_t*>(value)),
-              "Could not add Kudu WriteOp.");
-          break;
-        case TYPE_SMALLINT:
-          KUDU_RETURN_IF_ERROR(
-              write->mutable_row()->SetInt16(col, *reinterpret_cast<int16_t*>(value)),
-              "Could not add Kudu WriteOp.");
-          break;
-        case TYPE_INT:
-          KUDU_RETURN_IF_ERROR(
-              write->mutable_row()->SetInt32(col, *reinterpret_cast<int32_t*>(value)),
-              "Could not add Kudu WriteOp.");
-          break;
-        case TYPE_BIGINT:
-          KUDU_RETURN_IF_ERROR(
-              write->mutable_row()->SetInt64(col, *reinterpret_cast<int64_t*>(value)),
-              "Could not add Kudu WriteOp.");
-          break;
-        default:
-          return Status(TErrorCode::IMPALA_KUDU_TYPE_MISSING, TypeToString(type));
-      }
+      WriteKuduRowValue(write->mutable_row(), col, type, value);
     }
     if (add_row) write_ops.push_back(move(write));
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/be/src/exec/kudu-util.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-util.cc b/be/src/exec/kudu-util.cc
index e7afb7c..59a4c81 100644
--- a/be/src/exec/kudu-util.cc
+++ b/be/src/exec/kudu-util.cc
@@ -27,6 +27,7 @@
 #include "common/logging.h"
 #include "common/names.h"
 #include "common/status.h"
+#include "runtime/string-value.h"
 
 using kudu::client::KuduSchema;
 using kudu::client::KuduClient;
@@ -107,4 +108,55 @@ void InitKuduLogging() {
   kudu::client::SetVerboseLogLevel(std::max(0, FLAGS_v - 1));
 }
 
+Status WriteKuduRowValue(kudu::KuduPartialRow* row, int col, PrimitiveType type,
+    const void* value, bool copy_strings) {
+  // TODO: codegen this to eliminate braching on type.
+  switch (type) {
+    case TYPE_VARCHAR:
+    case TYPE_STRING: {
+      const StringValue* sv = reinterpret_cast<const StringValue*>(value);
+      kudu::Slice slice(reinterpret_cast<uint8_t*>(sv->ptr), sv->len);
+      if (copy_strings) {
+        KUDU_RETURN_IF_ERROR(row->SetString(col, slice), "Could not set Kudu row value.");
+      } else {
+        KUDU_RETURN_IF_ERROR(
+            row->SetStringNoCopy(col, slice), "Could not set Kudu row value.");
+      }
+      break;
+    }
+    case TYPE_FLOAT:
+      KUDU_RETURN_IF_ERROR(row->SetFloat(col, *reinterpret_cast<const float*>(value)),
+          "Could not set Kudu row value.");
+      break;
+    case TYPE_DOUBLE:
+      KUDU_RETURN_IF_ERROR(row->SetDouble(col, *reinterpret_cast<const double*>(value)),
+          "Could not set Kudu row value.");
+      break;
+    case TYPE_BOOLEAN:
+      KUDU_RETURN_IF_ERROR(row->SetBool(col, *reinterpret_cast<const bool*>(value)),
+          "Could not set Kudu row value.");
+      break;
+    case TYPE_TINYINT:
+      KUDU_RETURN_IF_ERROR(row->SetInt8(col, *reinterpret_cast<const int8_t*>(value)),
+          "Could not set Kudu row value.");
+      break;
+    case TYPE_SMALLINT:
+      KUDU_RETURN_IF_ERROR(row->SetInt16(col, *reinterpret_cast<const int16_t*>(value)),
+          "Could not set Kudu row value.");
+      break;
+    case TYPE_INT:
+      KUDU_RETURN_IF_ERROR(row->SetInt32(col, *reinterpret_cast<const int32_t*>(value)),
+          "Could not set Kudu row value.");
+      break;
+    case TYPE_BIGINT:
+      KUDU_RETURN_IF_ERROR(row->SetInt64(col, *reinterpret_cast<const int64_t*>(value)),
+          "Could not set Kudu row value.");
+      break;
+    default:
+      return Status(TErrorCode::IMPALA_KUDU_TYPE_MISSING, TypeToString(type));
+  }
+
+  return Status::OK();
+}
+
 }  // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/be/src/exec/kudu-util.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-util.h b/be/src/exec/kudu-util.h
index 7774812..27765df 100644
--- a/be/src/exec/kudu-util.h
+++ b/be/src/exec/kudu-util.h
@@ -21,6 +21,8 @@
 #include <kudu/client/callbacks.h>
 #include <kudu/client/client.h>
 
+#include "runtime/types.h"
+
 namespace impala {
 
 class Status;
@@ -55,6 +57,13 @@ void InitKuduLogging();
 void LogKuduMessage(kudu::client::KuduLogSeverity severity, const char* filename,
     int line_number, const struct ::tm* time, const char* message, size_t message_len);
 
+/// Casts 'value' according to 'type' and writes it into 'row' at position 'col'.
+/// If 'type' is STRING or VARCHAR, 'copy_strings' determines if 'value' will be copied
+/// into memory owned by the row. If false, string data must remain valid while the row is
+/// being used.
+Status WriteKuduRowValue(kudu::KuduPartialRow* row, int col, PrimitiveType type,
+    const void* value, bool copy_strings = true);
+
 /// Takes a Kudu status and returns an impala one, if it's not OK.
 #define KUDU_RETURN_IF_ERROR(expr, prepend) \
   do { \

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/be/src/exprs/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exprs/CMakeLists.txt b/be/src/exprs/CMakeLists.txt
index 87a9ae0..c3cc151 100644
--- a/be/src/exprs/CMakeLists.txt
+++ b/be/src/exprs/CMakeLists.txt
@@ -42,6 +42,7 @@ add_library(Exprs
   in-predicate-ir.cc
   is-not-empty-predicate.cc
   is-null-predicate-ir.cc
+  kudu-partition-expr.cc
   like-predicate.cc
   like-predicate-ir.cc
   literal.cc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/be/src/exprs/expr-context.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-context.h b/be/src/exprs/expr-context.h
index 6903105..fe9bf44 100644
--- a/be/src/exprs/expr-context.h
+++ b/be/src/exprs/expr-context.h
@@ -152,6 +152,7 @@ class ExprContext {
   friend class CaseExpr;
   friend class HiveUdfCall;
   friend class ScalarFnCall;
+  friend class KuduPartitionExpr;
 
   /// FunctionContexts for each registered expression. The FunctionContexts are created
   /// and owned by this ExprContext.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/be/src/exprs/expr.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr.cc b/be/src/exprs/expr.cc
index 32c16a0..c7fe2ab 100644
--- a/be/src/exprs/expr.cc
+++ b/be/src/exprs/expr.cc
@@ -44,6 +44,7 @@
 #include "exprs/in-predicate.h"
 #include "exprs/is-not-empty-predicate.h"
 #include "exprs/is-null-predicate.h"
+#include "exprs/kudu-partition-expr.h"
 #include "exprs/like-predicate.h"
 #include "exprs/literal.h"
 #include "exprs/math-functions.h"
@@ -262,6 +263,9 @@ Status Expr::CreateExpr(ObjectPool* pool, const TExprNode& texpr_node, Expr** ex
     case TExprNodeType::IS_NOT_EMPTY_PRED:
       *expr = pool->Add(new IsNotEmptyPredicate(texpr_node));
       return Status::OK();
+    case TExprNodeType::KUDU_PARTITION_EXPR:
+      *expr = pool->Add(new KuduPartitionExpr(texpr_node));
+      return Status::OK();
     default:
       stringstream os;
       os << "Unknown expr node type: " << texpr_node.node_type;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/be/src/exprs/kudu-partition-expr.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/kudu-partition-expr.cc b/be/src/exprs/kudu-partition-expr.cc
new file mode 100644
index 0000000..2faaee5
--- /dev/null
+++ b/be/src/exprs/kudu-partition-expr.cc
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exprs/kudu-partition-expr.h"
+
+#include <gutil/strings/substitute.h>
+
+#include "exec/kudu-util.h"
+#include "exprs/expr-context.h"
+#include "runtime/query-state.h"
+#include "runtime/runtime-state.h"
+#include "runtime/tuple-row.h"
+#include "runtime/tuple.h"
+#include "runtime/types.h"
+
+namespace impala {
+
+KuduPartitionExpr::KuduPartitionExpr(const TExprNode& node)
+  : Expr(node), tkudu_partition_expr_(node.kudu_partition_expr) {}
+
+Status KuduPartitionExpr::Prepare(
+    RuntimeState* state, const RowDescriptor& row_desc, ExprContext* ctx) {
+  RETURN_IF_ERROR(Expr::Prepare(state, row_desc, ctx));
+  DCHECK_EQ(tkudu_partition_expr_.referenced_columns.size(), children_.size());
+
+  // Create the KuduPartitioner we'll use to get the partition index for each row.
+  TableDescriptor* table_desc =
+      state->desc_tbl().GetTableDescriptor(tkudu_partition_expr_.target_table_id);
+  DCHECK(table_desc != nullptr);
+  DCHECK(dynamic_cast<KuduTableDescriptor*>(table_desc))
+      << "Target table for KuduPartitioner must be a Kudu table.";
+  table_desc_ = static_cast<KuduTableDescriptor*>(table_desc);
+  kudu::client::KuduClient* client;
+  RETURN_IF_ERROR(
+      state->query_state()->GetKuduClient(table_desc_->kudu_master_addresses(), &client));
+  kudu::client::sp::shared_ptr<kudu::client::KuduTable> table;
+  KUDU_RETURN_IF_ERROR(
+      client->OpenTable(table_desc_->table_name(), &table), "Failed to open Kudu table.");
+  kudu::client::KuduPartitionerBuilder b(table);
+  kudu::client::KuduPartitioner* partitioner;
+  KUDU_RETURN_IF_ERROR(b.Build(&partitioner), "Failed to build Kudu partitioner.");
+  partitioner_.reset(partitioner);
+  row_.reset(table->schema().NewRow());
+
+  return Status::OK();
+}
+
+IntVal KuduPartitionExpr::GetIntVal(ExprContext* ctx, const TupleRow* row) {
+  for (int i = 0; i < children_.size(); ++i) {
+    void* val = ctx->GetValue(GetChild(i), row);
+    if (val == NULL) {
+      // We don't currently support nullable partition columns, but pass it along and let
+      // the KuduTableSink generate the error message.
+      return IntVal(-1);
+    }
+    int col = tkudu_partition_expr_.referenced_columns[i];
+    const ColumnDescriptor& col_desc = table_desc_->col_descs()[col];
+    PrimitiveType type = col_desc.type().type;
+    Status s = WriteKuduRowValue(row_.get(), col, type, val, false);
+    // This can only fail if we set a col to an incorect type, which would be a bug in
+    // planning, so we can DCHECK.
+    DCHECK(s.ok()) << "WriteKuduRowValue failed for col = " << col_desc.name()
+                   << " and type = " << col_desc.type() << ": " << s.GetDetail();
+  }
+
+  int32_t kudu_partition = -1;
+  kudu::Status s = partitioner_->PartitionRow(*row_.get(), &kudu_partition);
+  // This can only fail if we fail to supply some of the partition cols, which would be a
+  // bug in planning, so we can DCHECK.
+  DCHECK(s.ok()) << "KuduPartitioner::PartitionRow failed on row = '" << row_->ToString()
+                 << "': " << s.ToString();
+  return IntVal(kudu_partition);
+}
+
+Status KuduPartitionExpr::GetCodegendComputeFn(
+    LlvmCodeGen* codegen, llvm::Function** fn) {
+  return Status("Error: KuduPartitionExpr::GetCodegendComputeFn not implemented.");
+}
+
+} // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/be/src/exprs/kudu-partition-expr.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/kudu-partition-expr.h b/be/src/exprs/kudu-partition-expr.h
new file mode 100644
index 0000000..6620338
--- /dev/null
+++ b/be/src/exprs/kudu-partition-expr.h
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXPRS_KUDU_PARTITION_EXPR_H_
+#define IMPALA_EXPRS_KUDU_PARTITION_EXPR_H_
+
+#include <kudu/client/client.h>
+
+#include "exprs/expr.h"
+
+namespace impala {
+
+class KuduTableDescriptor;
+class TExprNode;
+class TKuduPartitionExpr;
+
+/// Expr that calls into the Kudu client to determine the partition index for rows.
+/// Returns -1 if the row doesn't have a partition or if an error is encountered.
+/// The children of this Expr produce the values for the partition columns.
+class KuduPartitionExpr : public Expr {
+ protected:
+  friend class Expr;
+
+  KuduPartitionExpr(const TExprNode& node);
+
+  virtual Status Prepare(
+      RuntimeState* state, const RowDescriptor& row_desc, ExprContext* ctx);
+
+  virtual IntVal GetIntVal(ExprContext* ctx, const TupleRow* row);
+
+  virtual Status GetCodegendComputeFn(LlvmCodeGen* codegen, llvm::Function** fn);
+
+ private:
+  TKuduPartitionExpr tkudu_partition_expr_;
+
+  /// Descriptor of the table to use the partiitoning scheme from. Set in Prepare().
+  KuduTableDescriptor* table_desc_;
+
+  /// Used to call into Kudu to determine partitions. Set in Prepare().
+  std::unique_ptr<kudu::client::KuduPartitioner> partitioner_;
+
+  /// Stores the col values for each row that is partitioned.
+  std::unique_ptr<kudu::KuduPartialRow> row_;
+};
+
+} // namespace impala
+
+#endif // IMPALA_EXPRS_KUDU_PARTITION_EXPR_H_

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index a8284c2..bede0ef 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -1842,6 +1842,18 @@ void Coordinator::SetExecPlanDescriptorTable(const TPlanFragment& fragment,
     table_ids.insert(fragment.output_sink.table_sink.target_table_id);
   }
 
+  // For DataStreamSinks that partition according to the partitioning scheme of a Kudu
+  // table, we need the corresponding tableId.
+  if (fragment.__isset.output_sink && fragment.output_sink.__isset.stream_sink
+      && fragment.output_sink.type == TDataSinkType::DATA_STREAM_SINK
+      && fragment.output_sink.stream_sink.output_partition.type == TPartitionType::KUDU) {
+    TDataPartition partition = fragment.output_sink.stream_sink.output_partition;
+    DCHECK_EQ(partition.partition_exprs.size(), 1);
+    DCHECK(partition.partition_exprs[0].nodes[0].__isset.kudu_partition_expr);
+    table_ids.insert(
+        partition.partition_exprs[0].nodes[0].kudu_partition_expr.target_table_id);
+  }
+
   // Iterate over all TTableDescriptor(s) and add the ones that are needed.
   for (const TTableDescriptor& table_desc: desc_tbl_.tableDescriptors) {
     if (table_ids.find(table_desc.id) == table_ids.end()) continue;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/be/src/runtime/data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.cc b/be/src/runtime/data-stream-sender.cc
index 9e0ddc7..dc98c49 100644
--- a/be/src/runtime/data-stream-sender.cc
+++ b/be/src/runtime/data-stream-sender.cc
@@ -331,6 +331,7 @@ DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id,
     int per_channel_buffer_size)
   : DataSink(row_desc),
     sender_id_(sender_id),
+    partition_type_(sink.output_partition.type),
     current_channel_idx_(0),
     flushed_(false),
     closed_(false),
@@ -339,13 +340,13 @@ DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id,
     thrift_transmit_timer_(NULL),
     bytes_sent_counter_(NULL),
     total_sent_rows_counter_(NULL),
-    dest_node_id_(sink.dest_node_id) {
+    dest_node_id_(sink.dest_node_id),
+    next_unknown_partition_(0) {
   DCHECK_GT(destinations.size(), 0);
   DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED
       || sink.output_partition.type == TPartitionType::HASH_PARTITIONED
-      || sink.output_partition.type == TPartitionType::RANDOM);
-  broadcast_ = sink.output_partition.type == TPartitionType::UNPARTITIONED;
-  random_ = sink.output_partition.type == TPartitionType::RANDOM;
+      || sink.output_partition.type == TPartitionType::RANDOM
+      || sink.output_partition.type == TPartitionType::KUDU);
   // TODO: use something like google3's linked_ptr here (scoped_ptr isn't copyable)
   for (int i = 0; i < destinations.size(); ++i) {
     channels_.push_back(
@@ -354,17 +355,18 @@ DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id,
                     sink.dest_node_id, per_channel_buffer_size));
   }
 
-  if (broadcast_ || random_) {
+  if (partition_type_ == TPartitionType::UNPARTITIONED
+      || partition_type_ == TPartitionType::RANDOM) {
     // Randomize the order we open/transmit to channels to avoid thundering herd problems.
     srand(reinterpret_cast<uint64_t>(this));
     random_shuffle(channels_.begin(), channels_.end());
   }
 
-  if (sink.output_partition.type == TPartitionType::HASH_PARTITIONED) {
+  if (partition_type_ == TPartitionType::HASH_PARTITIONED
+      || partition_type_ == TPartitionType::KUDU) {
     // TODO: move this to Init()? would need to save 'sink' somewhere
-    Status status =
-        Expr::CreateExprTrees(pool, sink.output_partition.partition_exprs,
-                              &partition_expr_ctxs_);
+    Status status = Expr::CreateExprTrees(
+        pool, sink.output_partition.partition_exprs, &partition_expr_ctxs_);
     DCHECK(status.ok());
   }
 }
@@ -420,7 +422,8 @@ Status DataStreamSender::Send(RuntimeState* state, RowBatch* batch) {
   DCHECK(!flushed_);
 
   if (batch->num_rows() == 0) return Status::OK();
-  if (broadcast_ || channels_.size() == 1) {
+  if (partition_type_ == TPartitionType::UNPARTITIONED
+      || channels_.size() == 1) {
     // current_thrift_batch_ is *not* the one that was written by the last call
     // to Serialize()
     RETURN_IF_ERROR(SerializeBatch(batch, current_thrift_batch_, channels_.size()));
@@ -431,7 +434,7 @@ Status DataStreamSender::Send(RuntimeState* state, RowBatch* batch) {
     }
     current_thrift_batch_ =
         (current_thrift_batch_ == &thrift_batch1_ ? &thrift_batch2_ : &thrift_batch1_);
-  } else if (random_) {
+  } else if (partition_type_ == TPartitionType::RANDOM) {
     // Round-robin batches among channels. Wait for the current channel to finish its
     // rpc before overwriting its batch.
     Channel* current_channel = channels_[current_channel_idx_];
@@ -439,8 +442,25 @@ Status DataStreamSender::Send(RuntimeState* state, RowBatch* batch) {
     RETURN_IF_ERROR(SerializeBatch(batch, current_channel->thrift_batch()));
     RETURN_IF_ERROR(current_channel->SendBatch(current_channel->thrift_batch()));
     current_channel_idx_ = (current_channel_idx_ + 1) % channels_.size();
+  } else if (partition_type_ == TPartitionType::KUDU) {
+    DCHECK_EQ(partition_expr_ctxs_.size(), 1);
+    int num_channels = channels_.size();
+    for (int i = 0; i < batch->num_rows(); ++i) {
+      TupleRow* row = batch->GetRow(i);
+      int32_t partition =
+          *reinterpret_cast<int32_t*>(partition_expr_ctxs_[0]->GetValue(row));
+      if (partition < 0) {
+        // This row doesn't coorespond to a partition, e.g. it's outside the given ranges.
+        partition = next_unknown_partition_;
+        ++next_unknown_partition_;
+      }
+      channels_[partition % num_channels]->AddRow(row);
+    }
   } else {
+    DCHECK(partition_type_ == TPartitionType::HASH_PARTITIONED);
     // hash-partition batch's rows across channels
+    // TODO: encapsulate this in an Expr as we've done for Kudu above and remove this case
+    // once we have codegen here.
     int num_channels = channels_.size();
     for (int i = 0; i < batch->num_rows(); ++i) {
       TupleRow* row = batch->GetRow(i);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/be/src/runtime/data-stream-sender.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.h b/be/src/runtime/data-stream-sender.h
index 60b9025..d157577 100644
--- a/be/src/runtime/data-stream-sender.h
+++ b/be/src/runtime/data-stream-sender.h
@@ -104,8 +104,7 @@ class DataStreamSender : public DataSink {
   /// Sender instance id, unique within a fragment.
   int sender_id_;
   RuntimeState* state_;
-  bool broadcast_;  // if true, send all rows on all channels
-  bool random_; // if true, round-robins row batches among channels
+  TPartitionType::type partition_type_; // The type of partitioning to perform.
   int current_channel_idx_; // index of current channel to send to if random_ == true
 
   /// If true, this sender has called FlushFinal() successfully.
@@ -139,6 +138,10 @@ class DataStreamSender : public DataSink {
 
   /// Identifier of the destination plan node.
   PlanNodeId dest_node_id_;
+
+  /// Used for Kudu partitioning to round-robin rows that don't correspond to a partition
+  /// or when errors are encountered.
+  int next_unknown_partition_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index bca5965..988a0b4 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -304,7 +304,8 @@ void Scheduler::ComputeFragmentExecParams(QuerySchedule* schedule) {
       const TDataStreamSink& sink = src_fragment.output_sink.stream_sink;
       DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED
           || sink.output_partition.type == TPartitionType::HASH_PARTITIONED
-          || sink.output_partition.type == TPartitionType::RANDOM);
+          || sink.output_partition.type == TPartitionType::RANDOM
+          || sink.output_partition.type == TPartitionType::KUDU);
       PlanNodeId exch_id = sink.dest_node_id;
       int sender_id_base = dest_params->per_exch_num_senders[exch_id];
       dest_params->per_exch_num_senders[exch_id] +=

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/common/thrift/Exprs.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Exprs.thrift b/common/thrift/Exprs.thrift
index fc0f4ee..7c88f2b 100644
--- a/common/thrift/Exprs.thrift
+++ b/common/thrift/Exprs.thrift
@@ -37,7 +37,8 @@ enum TExprNodeType {
   TUPLE_IS_NULL_PRED,
   FUNCTION_CALL,
   AGGREGATE_EXPR,
-  IS_NOT_EMPTY_PRED
+  IS_NOT_EMPTY_PRED,
+  KUDU_PARTITION_EXPR
 }
 
 struct TBoolLiteral {
@@ -122,6 +123,18 @@ struct TAggregateExpr {
   2: required list<Types.TColumnType> arg_types;
 }
 
+// Expr used to call into the Kudu client to determine the partition index for rows. The
+// values for the partition columns are produced by its children.
+struct TKuduPartitionExpr {
+  // The Kudu table to use the partitioning scheme from.
+  1: required Types.TTableId target_table_id
+
+  // Mapping from the children of this expr to their column positions in the table, i.e.
+  // child(i) produces the value for column referenced_columns[i].
+  // TODO: Include the partition cols in the KuduTableDesciptor and remove this.
+  2: required list<i32> referenced_columns
+}
+
 // This is essentially a union over the subclasses of Expr.
 struct TExprNode {
   1: required TExprNodeType node_type
@@ -151,6 +164,7 @@ struct TExprNode {
   18: optional TDecimalLiteral decimal_literal
   19: optional TAggregateExpr agg_expr
   20: optional TTimestampLiteral timestamp_literal
+  21: optional TKuduPartitionExpr kudu_partition_expr
 }
 
 // A flattened representation of a tree of Expr nodes, obtained by depth-first

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/common/thrift/Partitions.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Partitions.thrift b/common/thrift/Partitions.thrift
index 0e918d1..efc9b08 100644
--- a/common/thrift/Partitions.thrift
+++ b/common/thrift/Partitions.thrift
@@ -32,7 +32,13 @@ enum TPartitionType {
 
   // ordered partition on a list of exprs
   // (partition bounds don't overlap)
-  RANGE_PARTITIONED
+  RANGE_PARTITIONED,
+
+  // use the partitioning scheme of a Kudu table
+  // TODO: this is a special case now because Kudu supports multilevel partition
+  // schemes. We should add something like lists of TDataPartitions to reflect that
+  // and then this can be removed. (IMPALA-5255)
+  KUDU
 }
 
 // Specification of how a single logical data stream is partitioned.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index b7172da..83c6e40 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -106,9 +106,17 @@ public class InsertStmt extends StatementBase {
   // Set in analyze(). Contains metadata of target table to determine type of sink.
   private Table table_;
 
-  // Set in analyze(). Exprs corresponding to the partitionKeyValues.
+  // Set in analyze(). Exprs correspond to the partitionKeyValues, if specified, or to
+  // the partition columns for Kudu tables.
   private List<Expr> partitionKeyExprs_ = Lists.newArrayList();
 
+  // Set in analyze(). Maps exprs in partitionKeyExprs_ to their column's position in the
+  // table, eg. partitionKeyExprs_[i] corresponds to table_.columns(partitionKeyIdx_[i]).
+  // For Kudu tables, the primary keys are a leading subset of the cols, and the partition
+  // cols can be any subset of the primary keys, meaning that this list will be in
+  // ascending order from '0' to '# primary key cols - 1' but may leave out some numbers.
+  private List<Integer> partitionColPos_ = Lists.newArrayList();
+
   // Indicates whether this insert stmt has a shuffle or noshuffle plan hint.
   // Both flags may be false. Only one of them may be true, not both.
   // Shuffle forces data repartitioning before the data sink, and noshuffle
@@ -218,6 +226,7 @@ public class InsertStmt extends StatementBase {
     queryStmt_.reset();
     table_ = null;
     partitionKeyExprs_.clear();
+    partitionColPos_.clear();
     hasShuffleHint_ = false;
     hasNoShuffleHint_ = false;
     hasClusteredHint_ = false;
@@ -633,6 +642,11 @@ public class InsertStmt extends StatementBase {
     List<String> tmpPartitionKeyNames = new ArrayList<String>();
 
     int numClusteringCols = (tbl instanceof HBaseTable) ? 0 : tbl.getNumClusteringCols();
+    boolean isKuduTable = table_ instanceof KuduTable;
+    Set<String> kuduPartitionColumnNames = null;
+    if (isKuduTable) {
+      kuduPartitionColumnNames = ((KuduTable) table_).getPartitionColumnNames();
+    }
 
     // Check dynamic partition columns for type compatibility.
     for (int i = 0; i < selectListExprs.size(); ++i) {
@@ -643,6 +657,11 @@ public class InsertStmt extends StatementBase {
         // This is a dynamic clustering column
         tmpPartitionKeyExprs.add(compatibleExpr);
         tmpPartitionKeyNames.add(targetColumn.getName());
+      } else if (isKuduTable) {
+        if (kuduPartitionColumnNames.contains(targetColumn.getName())) {
+          tmpPartitionKeyExprs.add(compatibleExpr);
+          tmpPartitionKeyNames.add(targetColumn.getName());
+        }
       }
       selectListExprs.set(i, compatibleExpr);
     }
@@ -663,24 +682,28 @@ public class InsertStmt extends StatementBase {
     }
 
     // Reorder the partition key exprs and names to be consistent with the target table
-    // declaration.  We need those exprs in the original order to create the corresponding
-    // Hdfs folder structure correctly.
-    for (Column c: table_.getColumns()) {
+    // declaration, and store their column positions.  We need those exprs in the original
+    // order to create the corresponding Hdfs folder structure correctly, or the indexes
+    // to construct rows to pass to the Kudu partitioning API.
+    for (int i = 0; i < table_.getColumns().size(); ++i) {
+      Column c = table_.getColumns().get(i);
       for (int j = 0; j < tmpPartitionKeyNames.size(); ++j) {
         if (c.getName().equals(tmpPartitionKeyNames.get(j))) {
           partitionKeyExprs_.add(tmpPartitionKeyExprs.get(j));
+          partitionColPos_.add(i);
           break;
         }
       }
     }
 
-    Preconditions.checkState(partitionKeyExprs_.size() == numClusteringCols);
+    Preconditions.checkState(
+        (isKuduTable && partitionKeyExprs_.size() == kuduPartitionColumnNames.size())
+        || partitionKeyExprs_.size() == numClusteringCols);
     // Make sure we have stats for partitionKeyExprs
     for (Expr expr: partitionKeyExprs_) {
       expr.analyze(analyzer);
     }
 
-    boolean isKuduTable = table_ instanceof KuduTable;
     // Finally, 'undo' the permutation so that the selectListExprs are in Hive column
     // order, and add NULL expressions to all missing columns, unless this is an UPSERT.
     ArrayList<Column> columns = table_.getColumnsInHiveOrder();
@@ -741,9 +764,13 @@ public class InsertStmt extends StatementBase {
   }
 
   private void analyzePlanHints(Analyzer analyzer) throws AnalysisException {
-    if (!planHints_.isEmpty() && table_ instanceof HBaseTable) {
+    if (planHints_.isEmpty()) return;
+    if (isUpsert_) {
+      throw new AnalysisException("Hints not supported in UPSERT statements.");
+    }
+    if (table_ instanceof HBaseTable || table_ instanceof KuduTable) {
       throw new AnalysisException(String.format("INSERT hints are only supported for " +
-          "inserting into Hdfs and Kudu tables: %s", getTargetTableName()));
+          "inserting into Hdfs tables: %s", getTargetTableName()));
     }
     boolean hasNoClusteredHint = false;
     for (PlanHint hint: planHints_) {
@@ -776,29 +803,17 @@ public class InsertStmt extends StatementBase {
   }
 
   private void analyzeSortByHint(PlanHint hint) throws AnalysisException {
-    // HBase tables don't support insert hints at all (must be enforced by the caller).
-    Preconditions.checkState(!(table_ instanceof HBaseTable));
-
-    if (isUpsert_) {
-      throw new AnalysisException("SORTBY hint is not supported in UPSERT statements.");
-    }
+    // HBase and Kudu tables don't support insert hints at all (must be enforced by the caller).
+    Preconditions.checkState(!(table_ instanceof HBaseTable || table_ instanceof KuduTable));
 
     List<String> columnNames = hint.getArgs();
     Preconditions.checkState(!columnNames.isEmpty());
     for (String columnName: columnNames) {
-      // Make sure it's not a Kudu primary key column or Hdfs partition column.
-      if (table_ instanceof KuduTable) {
-        KuduTable kuduTable = (KuduTable) table_;
-        if (kuduTable.isPrimaryKeyColumn(columnName)) {
-          throw new AnalysisException(String.format("SORTBY hint column list must not " +
-              "contain Kudu primary key column: '%s'", columnName));
-        }
-      } else {
-        for (Column tableColumn: table_.getClusteringColumns()) {
-          if (tableColumn.getName().equals(columnName)) {
-            throw new AnalysisException(String.format("SORTBY hint column list must " +
-                "not contain Hdfs partition column: '%s'", columnName));
-          }
+      // Make sure it's not an Hdfs partition column.
+      for (Column tableColumn: table_.getClusteringColumns()) {
+        if (tableColumn.getName().equals(columnName)) {
+          throw new AnalysisException(String.format("SORTBY hint column list must " +
+              "not contain Hdfs partition column: '%s'", columnName));
         }
       }
 
@@ -843,6 +858,7 @@ public class InsertStmt extends StatementBase {
    */
   public QueryStmt getQueryStmt() { return queryStmt_; }
   public List<Expr> getPartitionKeyExprs() { return partitionKeyExprs_; }
+  public List<Integer> getPartitionColPos() { return partitionColPos_; }
   public boolean hasShuffleHint() { return hasShuffleHint_; }
   public boolean hasNoShuffleHint() { return hasNoShuffleHint_; }
   public boolean hasClusteredHint() { return hasClusteredHint_; }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java b/fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java
new file mode 100644
index 0000000..88644e2
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.impala.catalog.Type;
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.thrift.TExprNode;
+import org.apache.impala.thrift.TExprNodeType;
+import org.apache.impala.thrift.TKuduPartitionExpr;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * Internal expr that calls into the Kudu client to determine the partition index for
+ * a given row. Returns -1 for rows that do not correspond to a partition. The children of
+ * this Expr produce the values for the partition columns.
+ */
+public class KuduPartitionExpr extends Expr {
+  private final static Logger LOG = LoggerFactory.getLogger(KuduPartitionExpr.class);
+
+  // The table to use the partitioning scheme from.
+  private final int targetTableId_;
+  // Maps from this Epxrs children to column positions in the table, i.e. children_[i]
+  // produces the value for column partitionColPos_[i].
+  private List<Integer> partitionColPos_;
+
+  public KuduPartitionExpr(
+      int targetTableId, List<Expr> partitionKeyExprs, List<Integer> partitionKeyIdxs) {
+    Preconditions.checkState(partitionKeyExprs.size() == partitionKeyIdxs.size());
+    targetTableId_ = targetTableId;
+    partitionColPos_ = partitionKeyIdxs;
+    children_.addAll(Expr.cloneList(partitionKeyExprs));
+  }
+
+  /**
+   * Copy c'tor used in clone().
+   */
+  protected KuduPartitionExpr(KuduPartitionExpr other) {
+    super(other);
+    targetTableId_ = other.targetTableId_;
+    partitionColPos_ = other.partitionColPos_;
+  }
+
+  @Override
+  protected void analyzeImpl(Analyzer analyzer) throws AnalysisException {
+    type_ = Type.INT;
+  }
+
+  @Override
+  protected String toSqlImpl() {
+    StringBuilder sb = new StringBuilder("KuduPartition(");
+    for (int i = 0; i < children_.size(); ++i) {
+      if (i != 0) sb.append(", ");
+      sb.append(children_.get(i).toSql());
+    }
+    sb.append(")");
+    return sb.toString();
+  }
+
+  @Override
+  protected void toThrift(TExprNode msg) {
+    msg.node_type = TExprNodeType.KUDU_PARTITION_EXPR;
+    msg.kudu_partition_expr = new TKuduPartitionExpr();
+    for (int i = 0; i < children_.size(); ++i) {
+      msg.kudu_partition_expr.addToReferenced_columns(partitionColPos_.get(i));
+    }
+    msg.kudu_partition_expr.setTarget_table_id(targetTableId_);
+  }
+
+  @Override
+  public Expr clone() { return new KuduPartitionExpr(this); }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
index 5cf5fd1..af3fb46 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
@@ -19,6 +19,7 @@ package org.apache.impala.catalog;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -150,6 +151,14 @@ public class KuduTable extends Table {
     return ImmutableList.copyOf(partitionBy_);
   }
 
+  public Set<String> getPartitionColumnNames() {
+    Set<String> ret = new HashSet<String>();
+    for (KuduPartitionParam partitionParam : partitionBy_) {
+      ret.addAll(partitionParam.getColumnNames());
+    }
+    return ret;
+  }
+
   /**
    * Returns the range-based partitioning of this table if it exists, null otherwise.
    */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/fe/src/main/java/org/apache/impala/planner/DataPartition.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DataPartition.java b/fe/src/main/java/org/apache/impala/planner/DataPartition.java
index 9e942e4..67a411a 100644
--- a/fe/src/main/java/org/apache/impala/planner/DataPartition.java
+++ b/fe/src/main/java/org/apache/impala/planner/DataPartition.java
@@ -49,7 +49,8 @@ public class DataPartition {
     Preconditions.checkNotNull(exprs);
     Preconditions.checkState(!exprs.isEmpty());
     Preconditions.checkState(type == TPartitionType.HASH_PARTITIONED
-        || type == TPartitionType.RANGE_PARTITIONED);
+        || type == TPartitionType.RANGE_PARTITIONED
+        || type == TPartitionType.KUDU);
     type_ = type;
     partitionExprs_ = exprs;
   }
@@ -71,6 +72,10 @@ public class DataPartition {
     return new DataPartition(TPartitionType.HASH_PARTITIONED, exprs);
   }
 
+  public static DataPartition kuduPartitioned(Expr expr) {
+    return new DataPartition(TPartitionType.KUDU, Lists.newArrayList(expr));
+  }
+
   public boolean isPartitioned() { return type_ != TPartitionType.UNPARTITIONED; }
   public boolean isHashPartitioned() { return type_ == TPartitionType.HASH_PARTITIONED; }
   public TPartitionType getType() { return type_; }
@@ -123,6 +128,7 @@ public class DataPartition {
       case HASH_PARTITIONED: return "HASH";
       case RANGE_PARTITIONED: return "RANGE";
       case UNPARTITIONED: return "UNPARTITIONED";
+      case KUDU: return "KUDU";
       default: return "";
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index 85a8ab8..e0e325c 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -23,10 +23,13 @@ import java.util.List;
 import org.apache.impala.analysis.AggregateInfo;
 import org.apache.impala.analysis.AnalysisContext;
 import org.apache.impala.analysis.Analyzer;
+import org.apache.impala.analysis.DescriptorTable;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.InsertStmt;
 import org.apache.impala.analysis.JoinOperator;
+import org.apache.impala.analysis.KuduPartitionExpr;
 import org.apache.impala.analysis.QueryStmt;
+import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.planner.JoinNode.DistributionMode;
@@ -201,15 +204,19 @@ public class DistributedPlanner {
     // Ignore constants for the sake of partitioning.
     Expr.removeConstants(partitionExprs);
 
-    // Do nothing if the input fragment is already appropriately partitioned.
+    // Do nothing if the input fragment is already appropriately partitioned. TODO: handle
+    // Kudu tables here (IMPALA-5254).
     DataPartition inputPartition = inputFragment.getDataPartition();
-    if (!partitionExprs.isEmpty() &&
-        analyzer.equivSets(inputPartition.getPartitionExprs(), partitionExprs)) {
+    if (!partitionExprs.isEmpty()
+        && analyzer.equivSets(inputPartition.getPartitionExprs(), partitionExprs)
+        && !(insertStmt.getTargetTable() instanceof KuduTable)) {
       return inputFragment;
     }
 
-    // Make a cost-based decision only if no user hint was supplied.
-    if (!insertStmt.hasShuffleHint()) {
+    // Make a cost-based decision only if no user hint was supplied and this is not a Kudu
+    // table. TODO: consider making a cost based decision for Kudu tables.
+    if (!insertStmt.hasShuffleHint()
+        && !(insertStmt.getTargetTable() instanceof KuduTable)) {
       // If the existing partition exprs are a subset of the table partition exprs, check
       // if it is distributed across all nodes. If so, don't repartition.
       if (Expr.isSubset(inputPartition.getPartitionExprs(), partitionExprs)) {
@@ -238,6 +245,11 @@ public class DistributedPlanner {
     DataPartition partition;
     if (partitionExprs.isEmpty()) {
       partition = DataPartition.UNPARTITIONED;
+    } else if (insertStmt.getTargetTable() instanceof KuduTable) {
+      Expr kuduPartitionExpr = new KuduPartitionExpr(DescriptorTable.TABLE_SINK_ID,
+          partitionExprs, insertStmt.getPartitionColPos());
+      kuduPartitionExpr.analyze(ctx_.getRootAnalyzer());
+      partition = DataPartition.kuduPartitioned(kuduPartitionExpr);
     } else {
       partition = DataPartition.hashPartitioned(partitionExprs);
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/fe/src/main/java/org/apache/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index ad5bba5..418e0c6 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -24,10 +24,12 @@ import java.util.List;
 import org.apache.impala.analysis.AnalysisContext;
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.ColumnLineageGraph;
+import org.apache.impala.analysis.DescriptorTable;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.ExprSubstitutionMap;
 import org.apache.impala.analysis.InsertStmt;
 import org.apache.impala.analysis.JoinOperator;
+import org.apache.impala.analysis.KuduPartitionExpr;
 import org.apache.impala.analysis.QueryStmt;
 import org.apache.impala.analysis.SortInfo;
 import org.apache.impala.catalog.HBaseTable;
@@ -38,6 +40,7 @@ import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TExplainLevel;
+import org.apache.impala.thrift.TPartitionType;
 import org.apache.impala.thrift.TQueryCtx;
 import org.apache.impala.thrift.TQueryExecRequest;
 import org.apache.impala.thrift.TQueryOptions;
@@ -493,12 +496,17 @@ public class Planner {
        Analyzer analyzer) throws ImpalaException {
     List<Expr> orderingExprs = Lists.newArrayList();
 
-    if (insertStmt.hasClusteredHint()) {
-      if (insertStmt.getTargetTable() instanceof KuduTable) {
+    if (insertStmt.getTargetTable() instanceof KuduTable) {
+      if (inputFragment.getDataPartition().getType() == TPartitionType.KUDU) {
+        Preconditions.checkState(
+            inputFragment.getDataPartition().getPartitionExprs().size() == 1);
+        // Only sort for Kudu if we've already partitioned so that we can sort the
+        // partitions separately. This will be true if this is a distributed exec.
+        orderingExprs.add(inputFragment.getDataPartition().getPartitionExprs().get(0));
         orderingExprs.addAll(insertStmt.getPrimaryKeyExprs());
-      } else {
-        orderingExprs.addAll(insertStmt.getPartitionKeyExprs());
       }
+    } else if (insertStmt.hasClusteredHint()) {
+      orderingExprs.addAll(insertStmt.getPartitionKeyExprs());
     }
     orderingExprs.addAll(insertStmt.getSortByExprs());
     // Ignore constants for the sake of clustering.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/fe/src/main/java/org/apache/impala/planner/TableSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/TableSink.java b/fe/src/main/java/org/apache/impala/planner/TableSink.java
index 8595eea..13e79c3 100644
--- a/fe/src/main/java/org/apache/impala/planner/TableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/TableSink.java
@@ -119,8 +119,6 @@ public abstract class TableSink extends DataSink {
     } else if (table instanceof KuduTable) {
       // Kudu doesn't have a way to perform INSERT OVERWRITE.
       Preconditions.checkState(overwrite == false);
-      // Partition clauses don't make sense for Kudu inserts.
-      Preconditions.checkState(partitionKeyExprs.isEmpty());
       // sortby() hint is not supported for Kudu tables.
       Preconditions.checkState(sortByColumns.isEmpty());
       return new KuduTableSink(table, sinkAction, referencedColumns);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index c6a71e3..88297fe 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -1736,8 +1736,14 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
       AnalysisError(String.format(
           "insert into table functional_hbase.alltypes %sshuffle%s " +
           "select * from functional_hbase.alltypes", prefix, suffix),
-          "INSERT hints are only supported for inserting into Hdfs and Kudu tables: " +
+          "INSERT hints are only supported for inserting into Hdfs tables: " +
           "functional_hbase.alltypes");
+      // Plan hints do not make sense for inserting into Kudu tables.
+      AnalysisError(String.format(
+          "insert into table functional_kudu.alltypes %sshuffle%s " +
+          "select * from functional_kudu.alltypes", prefix, suffix),
+          "INSERT hints are only supported for inserting into Hdfs tables: " +
+          "functional_kudu.alltypes");
       // Conflicting plan hints.
       AnalysisError("insert into table functional.alltypessmall " +
           "partition (year, month) /* +shuffle,noshuffle */ " +
@@ -1791,14 +1797,6 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
           "partition (year, month) %ssortby(year)%s select * from " +
           "functional.alltypes", prefix, suffix),
           "SORTBY hint column list must not contain Hdfs partition column: 'year'");
-      // Column in sortby hint must not be a Kudu primary key column.
-      AnalysisError(String.format("insert into functional_kudu.alltypessmall " +
-          "%ssortby(id)%s select * from functional_kudu.alltypes", prefix, suffix),
-          "SORTBY hint column list must not contain Kudu primary key column: 'id'");
-      // sortby() hint is not supported in UPSERT queries
-      AnalysisError(String.format("upsert into functional_kudu.alltypessmall " +
-          "%ssortby(id)%s select * from functional_kudu.alltypes", prefix, suffix),
-          "SORTBY hint is not supported in UPSERT statements.");
     }
 
     // Multiple non-conflicting hints and case insensitivity of hints.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java
index 118b322..2a20c2a 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java
@@ -60,12 +60,9 @@ public class AnalyzeUpsertStmtTest extends AnalyzerTest {
         "from functional.alltypes a, functional.allcomplextypes b, " +
         "(select item from b.int_array_col) v1 " +
         "where a.id = b.id");
-    // Hint
-    AnalyzesOk("upsert into table functional_kudu.testtbl [clustered] select * from " +
-        "functional_kudu.testtbl");
-    // Incorrect hint, results in warning
-    AnalyzesOk("upsert into table functional_kudu.testtbl [badhint] select * from " +
-        "functional_kudu.testtbl", "INSERT hint not recognized: badhint");
+    // Hint, not supported for Kudu tables.
+    AnalysisError("upsert into table functional_kudu.testtbl [clustered] select * from " +
+        "functional_kudu.testtbl", "Hints not supported in UPSERT statements.");
 
     // Key columns missing from permutation
     AnalysisError("upsert into functional_kudu.testtbl(zip) values(1)",

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
index bbcb014..e04278d 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
@@ -7,6 +7,16 @@ UPSERT INTO KUDU [functional_kudu.testtbl]
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=1/24 files=1 size=20.36KB
+---- DISTRIBUTEDPLAN
+UPSERT INTO KUDU [functional_kudu.testtbl]
+|
+02:SORT
+|  order by: KuduPartition(bigint_col) ASC NULLS LAST, bigint_col ASC NULLS LAST
+|
+01:EXCHANGE [KUDU(KuduPartition(bigint_col))]
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=1/24 files=1 size=20.36KB
 ====
 # simple upsert with values clause
 upsert into table functional_kudu.testtbl
@@ -41,6 +51,11 @@ UPSERT INTO KUDU [functional_kudu.testtbl]
 ---- DISTRIBUTEDPLAN
 UPSERT INTO KUDU [functional_kudu.testtbl]
 |
+08:SORT
+|  order by: KuduPartition(a.bigint_col) ASC NULLS LAST, bigint_col ASC NULLS LAST
+|
+07:EXCHANGE [KUDU(KuduPartition(a.bigint_col))]
+|
 03:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: a.string_col = string_col
 |  runtime filters: RF000 <- string_col
@@ -79,14 +94,99 @@ UPSERT INTO KUDU [functional_kudu.testtbl]
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+UPSERT INTO KUDU [functional_kudu.testtbl]
+|
+05:SORT
+|  order by: KuduPartition(id) ASC NULLS LAST, id ASC NULLS LAST
+|
+04:EXCHANGE [KUDU(KuduPartition(id))]
+|
+03:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  group by: id, string_col
+|  having: CAST(count(*) AS INT) < 10
+|
+02:EXCHANGE [HASH(id,string_col)]
+|
+01:AGGREGATE [STREAMING]
+|  output: count(*)
+|  group by: id, string_col
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
 ====
-upsert into functional_kudu.testtbl /*+ clustered */
+upsert into functional_kudu.testtbl
 select * from functional_kudu.testtbl
 ---- PLAN
 UPSERT INTO KUDU [functional_kudu.testtbl]
 |
-01:SORT
-|  order by: id ASC NULLS LAST
+00:SCAN KUDU [functional_kudu.testtbl]
+---- DISTRIBUTEDPLAN
+UPSERT INTO KUDU [functional_kudu.testtbl]
+|
+02:SORT
+|  order by: KuduPartition(functional_kudu.testtbl.id) ASC NULLS LAST, id ASC NULLS LAST
+|
+01:EXCHANGE [KUDU(KuduPartition(functional_kudu.testtbl.id))]
 |
 00:SCAN KUDU [functional_kudu.testtbl]
 ====
+# upsert with a union
+upsert into functional_kudu.testtbl select * from functional_kudu.testtbl where id % 3 = 0
+union all select * from functional_kudu.testtbl where id % 3 = 1
+---- PLAN
+UPSERT INTO KUDU [functional_kudu.testtbl]
+|
+00:UNION
+|
+|--02:SCAN KUDU [functional_kudu.testtbl]
+|     predicates: id % 3 = 1
+|
+01:SCAN KUDU [functional_kudu.testtbl]
+   predicates: id % 3 = 0
+---- DISTRIBUTEDPLAN
+UPSERT INTO KUDU [functional_kudu.testtbl]
+|
+04:SORT
+|  order by: KuduPartition(id) ASC NULLS LAST, id ASC NULLS LAST
+|
+03:EXCHANGE [KUDU(KuduPartition(id))]
+|
+00:UNION
+|
+|--02:SCAN KUDU [functional_kudu.testtbl]
+|     predicates: id % 3 = 1
+|
+01:SCAN KUDU [functional_kudu.testtbl]
+   predicates: id % 3 = 0
+====
+# upsert with agg on col that is already partitioned in the input and target table
+# TODO: we shouldn't need to do any repartioning here (IMPALA-5254).
+upsert into functional_kudu.alltypes
+select id, true, 0, 0, 0, 0, 0, 0, '', '', '', 0, 0 from functional_kudu.alltypes group by id
+---- PLAN
+UPSERT INTO KUDU [functional_kudu.alltypes]
+|
+01:AGGREGATE [FINALIZE]
+|  group by: id
+|
+00:SCAN KUDU [functional_kudu.alltypes]
+---- DISTRIBUTEDPLAN
+UPSERT INTO KUDU [functional_kudu.alltypes]
+|
+05:SORT
+|  order by: KuduPartition(id) ASC NULLS LAST, id ASC NULLS LAST
+|
+04:EXCHANGE [KUDU(KuduPartition(id))]
+|
+03:AGGREGATE [FINALIZE]
+|  group by: id
+|
+02:EXCHANGE [HASH(id)]
+|
+01:AGGREGATE [STREAMING]
+|  group by: id
+|
+00:SCAN KUDU [functional_kudu.alltypes]
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
index a978b8b..12ab9fc 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
@@ -50,6 +50,11 @@ INSERT INTO KUDU [functional_kudu.testtbl]
 ---- DISTRIBUTEDPLAN
 INSERT INTO KUDU [functional_kudu.testtbl]
 |
+02:SORT
+|  order by: KuduPartition(10) ASC NULLS LAST, 10 ASC NULLS LAST
+|
+01:EXCHANGE [KUDU(KuduPartition(10))]
+|
 00:UNION
    constant-operands=1
 ====
@@ -61,6 +66,11 @@ INSERT INTO KUDU [functional_kudu.testtbl]
 ---- DISTRIBUTEDPLAN
 INSERT INTO KUDU [functional_kudu.testtbl]
 |
+02:SORT
+|  order by: KuduPartition(int_col) ASC NULLS LAST, int_col ASC NULLS LAST
+|
+01:EXCHANGE [KUDU(KuduPartition(int_col))]
+|
 00:SCAN KUDU [functional_kudu.tinyinttable]
 ====
 insert into functional_kudu.testtbl(id, name)
@@ -80,6 +90,11 @@ INSERT INTO KUDU [functional_kudu.testtbl]
 ---- DISTRIBUTEDPLAN
 INSERT INTO KUDU [functional_kudu.testtbl]
 |
+06:SORT
+|  order by: KuduPartition(count(id)) ASC NULLS LAST, count(id) ASC NULLS LAST
+|
+05:EXCHANGE [KUDU(KuduPartition(count(id)))]
+|
 02:AGGREGATE [FINALIZE]
 |  output: count(id)
 |  group by: name
@@ -240,26 +255,23 @@ PLAN-ROOT SINK
 00:SCAN KUDU [functional_kudu.alltypes]
    kudu predicates: id < 1475059775, id > 1475059665
 ====
-# IMPALA-2521: clustered insert into table adds sort node.
-insert into table functional_kudu.alltypes /*+ clustered */
+insert into table functional_kudu.alltypes
 select * from functional_kudu.alltypes
 ---- PLAN
 INSERT INTO KUDU [functional_kudu.alltypes]
 |
-01:SORT
-|  order by: id ASC NULLS LAST
-|
 00:SCAN KUDU [functional_kudu.alltypes]
 ---- DISTRIBUTEDPLAN
 INSERT INTO KUDU [functional_kudu.alltypes]
 |
-01:SORT
-|  order by: id ASC NULLS LAST
+02:SORT
+|  order by: KuduPartition(functional_kudu.alltypes.id) ASC NULLS LAST, id ASC NULLS LAST
+|
+01:EXCHANGE [KUDU(KuduPartition(functional_kudu.alltypes.id))]
 |
 00:SCAN KUDU [functional_kudu.alltypes]
 ====
-# IMPALA-2521: clustered insert into table adds sort node, correctly substituting exprs.
-insert into table functional_kudu.testtbl /*+ clustered */
+insert into table functional_kudu.testtbl
 select id, name, maxzip as zip
 from (
 select id, max(zip) as maxzip, name
@@ -268,9 +280,6 @@ from functional_kudu.testtbl group by id, name
 ---- PLAN
 INSERT INTO KUDU [functional_kudu.testtbl]
 |
-02:SORT
-|  order by: id ASC NULLS LAST
-|
 01:AGGREGATE [FINALIZE]
 |  output: max(zip)
 |  group by: id, name
@@ -279,8 +288,10 @@ INSERT INTO KUDU [functional_kudu.testtbl]
 ---- DISTRIBUTEDPLAN
 INSERT INTO KUDU [functional_kudu.testtbl]
 |
-04:SORT
-|  order by: id ASC NULLS LAST
+05:SORT
+|  order by: KuduPartition(id) ASC NULLS LAST, id ASC NULLS LAST
+|
+04:EXCHANGE [KUDU(KuduPartition(id))]
 |
 03:AGGREGATE [FINALIZE]
 |  output: max:merge(zip)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/801c95f3/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test
index f2b12b1..71224ce 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test
@@ -278,42 +278,6 @@ NumModifiedRows: 1
 NumRowErrors: 7299
 ====
 ---- QUERY
-# IMPALA-2521: clustered insert into table.
-create table impala_2521
-(id bigint primary key, name string, zip int)
-partition by hash partitions 3 stored as kudu
----- RESULTS
-====
----- QUERY
-insert into impala_2521 /*+ clustered */
-select id, name, maxzip as zip
-from (
-select tinyint_col as id, cast(max(int_col) + 1 as int) as maxzip, string_col as name
-from functional_kudu.alltypessmall group by id, name
-) as sub;
----- RESULTS
-: 10
----- RUNTIME_PROFILE
-NumModifiedRows: 10
-NumRowErrors: 0
-====
----- QUERY
-select * from impala_2521
----- RESULTS
-0,'0',1
-1,'1',2
-2,'2',3
-3,'3',4
-4,'4',5
-5,'5',6
-6,'6',7
-7,'7',8
-8,'8',9
-9,'9',10
----- TYPES
-BIGINT,STRING,INT
-====
----- QUERY
 # Table with all supported types as primary key and distribution columns
 create table allkeytypes (i1 tinyint, i2 smallint, i3 int, i4 bigint, name string,
   valf float, vald double, primary key (i1, i2, i3, i4, name)) partition by


[2/2] incubator-impala git commit: IMPALA-5162, IMPALA-5163: stress test support on secure clusters

Posted by mi...@apache.org.
IMPALA-5162,IMPALA-5163: stress test support on secure clusters

This patch adds support for running the stress test
(concurrent_select.py) and loading nested data (load_nested.py) into a
Kerberized, SSL-enabled Impala cluster. It assumes the calling user
already has a valid Kerberos ticket. One way to do that is:

1. Get access to a keytab and krb5.config
2. Set KRB5_CONFIG and KRB5CCNAME appropriately
3. Run kinit(1)
4. Run load_nested.py and/or concurrent_select.py within this
   environment.

Because our Python clients already support Kerberos and SSL, we simply
need to make sure to use the correct options when calling the entry
points and initializing the clients:

Impala: Impyla
Hive: Impyla
HDFS: hdfs.ext.kerberos.KerberosClient

With this patch, I was able to manually do a short concurrent_select.py
run against a secure cluster without connection or auth errors, and I
was able to do the same with load_nested.py for a cluster that already
had TPC-H loaded.

Follow-ons for future cleanup work:

IMPALA-5263: support CA bundles when running stress test against SSL'd
             Impala

IMPALA-5264: fix InsecurePlatformWarning under stress test with SSL

Change-Id: I0daad57bb8ceeb5071b75125f11c1997ed7e0179
Reviewed-on: http://gerrit.cloudera.org:8080/6763
Reviewed-by: Matthew Mulder <mm...@cloudera.com>
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/8b459dff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/8b459dff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/8b459dff

Branch: refs/heads/master
Commit: 8b459dffec9e093e87da9ab6e8b2e5a9de50a7bd
Parents: 801c95f
Author: Michael Brown <mi...@cloudera.com>
Authored: Fri Mar 31 10:39:54 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue May 2 04:56:01 2017 +0000

----------------------------------------------------------------------
 testdata/bin/load_nested.py       |  2 ++
 tests/comparison/cli_options.py   |  9 ++++++
 tests/comparison/cluster.py       | 57 +++++++++++++++++++++++++---------
 tests/comparison/db_connection.py |  9 ++++--
 tests/stress/concurrent_select.py |  2 +-
 5 files changed, 61 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8b459dff/testdata/bin/load_nested.py
----------------------------------------------------------------------
diff --git a/testdata/bin/load_nested.py b/testdata/bin/load_nested.py
index b44bd04..146c0ff 100755
--- a/testdata/bin/load_nested.py
+++ b/testdata/bin/load_nested.py
@@ -298,6 +298,8 @@ if __name__ == "__main__":
   parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
   cli_options.add_logging_options(parser)
   cli_options.add_cluster_options(parser)  # --cm-host and similar args added here
+  cli_options.add_kerberos_options(parser)
+  cli_options.add_ssl_options(parser)
 
   parser.add_argument("-s", "--source-db", default="tpch_parquet")
   parser.add_argument("-t", "--target-db", default="tpch_nested_parquet")

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8b459dff/tests/comparison/cli_options.py
----------------------------------------------------------------------
diff --git a/tests/comparison/cli_options.py b/tests/comparison/cli_options.py
index 714d899..ca2efc6 100644
--- a/tests/comparison/cli_options.py
+++ b/tests/comparison/cli_options.py
@@ -156,6 +156,13 @@ def add_cm_options(parser):
       help='If CM manages multiple clusters, use this to specify which cluster to use.')
 
 
+def add_ssl_options(parser):
+  group = parser.add_argument_group('SSL Options')
+  group.add_argument(
+      '--use-ssl', action='store_true', default=False,
+      help='Use SSL to connect')
+
+
 def create_cluster(args):
   if args.cm_host:
     cluster = CmCluster(
@@ -167,6 +174,8 @@ def create_cluster(args):
   else:
     cluster = MiniCluster(args.hive_host, args.hive_port, args.minicluster_num_impalads)
   cluster.hadoop_user_name = args.hadoop_user_name
+  cluster.use_kerberos = getattr(args, 'use_kerberos', False)
+  cluster.use_ssl = getattr(args, 'use_ssl', False)
   return cluster
 
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8b459dff/tests/comparison/cluster.py
----------------------------------------------------------------------
diff --git a/tests/comparison/cluster.py b/tests/comparison/cluster.py
index ab1e4f3..87a4009 100644
--- a/tests/comparison/cluster.py
+++ b/tests/comparison/cluster.py
@@ -44,6 +44,7 @@ from urlparse import urlparse
 from xml.etree.ElementTree import parse as parse_xml
 from zipfile import ZipFile
 
+
 from db_connection import HiveConnection, ImpalaConnection
 from tests.common.errors import Timeout
 from tests.util.shell_util import shell as local_shell
@@ -71,7 +72,8 @@ class Cluster(object):
     self._hadoop_configs = None
     self._local_hadoop_conf_dir = None
     self.hadoop_user_name = getuser()
-    self.is_kerberized = False
+    self.use_kerberos = False
+    self.use_ssl = False
 
     self._hdfs = None
     self._yarn = None
@@ -360,9 +362,18 @@ class Hdfs(Service):
     """Returns an HdfsClient."""
     endpoint = self.cluster.get_hadoop_config("dfs.namenode.http-address",
                                               "0.0.0.0:50070")
-    if endpoint.startswith("0.0.0.0"):
-      endpoint.replace("0.0.0.0", "127.0.0.1")
-    return HdfsClient("http://%s" % endpoint, use_kerberos=False,
+    ip, port = endpoint.split(':')
+    if ip == "0.0.0.0":
+      ip = "127.0.0.1"
+    if self.cluster.use_ssl:
+      port = self.cluster.get_hadoop_config("dfs.https.port", 20102)
+      scheme = 'https'
+    else:
+      scheme = 'http'
+    endpoint = ':'.join([ip, port])
+    return HdfsClient(
+        "{scheme}://{endpoint}".format(scheme=scheme, endpoint=endpoint),
+        use_kerberos=self.cluster.use_kerberos,
         user_name=(self._admin_user_name if as_admin else self.cluster.hadoop_user_name))
 
   def ensure_home_dir(self, user=None):
@@ -381,12 +392,16 @@ class Hdfs(Service):
 class HdfsClient(object):
 
   def __init__(self, url, user_name=None, use_kerberos=False):
+    # Set a specific session that doesn't verify SSL certs. This is needed because
+    # requests doesn't like self-signed certs.
+    # TODO: Support a CA bundle.
+    s = requests.Session()
+    s.verify = False
     if use_kerberos:
-      # TODO: Have the virtualenv attempt to install a list of optional libs.
       try:
-        import kerberos
+        from hdfs.ext.kerberos import KerberosClient
       except ImportError as e:
-        if "No module named kerberos" not in str(e):
+        if "No module named requests_kerberos" not in str(e):
           raise e
         import os
         import subprocess
@@ -394,16 +409,18 @@ class HdfsClient(object):
         pip_path = os.path.join(os.environ["IMPALA_HOME"], "infra", "python", "env",
             "bin", "pip")
         try:
-          local_shell(pip_path + " install kerboros", stdout=subprocess.PIPE,
-              stderr=subprocess.STDOUT)
+          local_shell(pip_path + " install pykerberos==1.1.14 requests-kerberos==0.11.0",
+                      stdout=subprocess.PIPE,
+                      stderr=subprocess.STDOUT)
           LOG.info("kerberos installation complete.")
         except Exception as e:
           LOG.error("kerberos installation failed. Try installing libkrb5-dev and"
               " then try again.")
           raise e
-      self._client = hdfs.ext.kerberos.KerberosClient(url, user=user_name)
+      from hdfs.ext.kerberos import KerberosClient
+      self._client = KerberosClient(url, session=s)
     else:
-      self._client = hdfs.client.InsecureClient(url, user=user_name)
+      self._client = hdfs.client.InsecureClient(url, user=user_name, session=s)
 
   def __getattr__(self, name):
     return getattr(self._client, name)
@@ -463,7 +480,7 @@ class Hive(Service):
   def connect(self, db_name=None):
     conn = HiveConnection(host_name=self.hs2_host_name, port=self.hs2_port,
         user_name=self.cluster.hadoop_user_name, db_name=db_name,
-        use_kerberos=self.cluster.is_kerberized)
+        use_kerberos=self.cluster.use_kerberos, use_ssl=self.cluster.use_ssl)
     conn.cluster = self.cluster
     return conn
 
@@ -497,7 +514,7 @@ class Impala(Service):
       impalad = choice(self.impalads)
     conn = ImpalaConnection(host_name=impalad.host_name, port=impalad.hs2_port,
         user_name=self.cluster.hadoop_user_name, db_name=db_name,
-        use_kerberos=self.cluster.is_kerberized)
+        use_kerberos=self.cluster.use_kerberos, use_ssl=self.cluster.use_ssl)
     conn.cluster = self.cluster
     return conn
 
@@ -734,9 +751,19 @@ class Impalad(object):
     return data
 
   def _request_web_page(self, relative_url, params={}, timeout_secs=DEFAULT_TIMEOUT):
-    url = "http://%s:%s%s" % (self.host_name, self.web_ui_port, relative_url)
+    if self.cluster.use_ssl:
+      scheme = 'https'
+    else:
+      scheme = 'http'
+    url = '{scheme}://{host}:{port}{url}'.format(
+        scheme=scheme,
+        host=self.host_name,
+        port=self.web_ui_port,
+        url=relative_url)
     try:
-      resp = requests.get(url, params=params, timeout=timeout_secs)
+      # verify=False is needed because of self-signed certifiates
+      # TODO: support a CA bundle that users could point to instead
+      resp = requests.get(url, params=params, timeout=timeout_secs, verify=False)
     except requests.exceptions.Timeout as e:
       raise Timeout(underlying_exception=e)
     resp.raise_for_status()

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8b459dff/tests/comparison/db_connection.py
----------------------------------------------------------------------
diff --git a/tests/comparison/db_connection.py b/tests/comparison/db_connection.py
index 7117bff..8722755 100644
--- a/tests/comparison/db_connection.py
+++ b/tests/comparison/db_connection.py
@@ -851,11 +851,13 @@ class ImpalaConnection(DbConnection):
 
   _DB_TYPE = IMPALA
   _CURSOR_CLASS = ImpalaCursor
+  _KERBEROS_SERVICE_NAME = 'impala'
   _NON_KERBEROS_AUTH_MECH = 'NOSASL'
 
-  def __init__(self, use_kerberos=False, **kwargs):
+  def __init__(self, use_kerberos=False, use_ssl=False, **kwargs):
     self._use_kerberos = use_kerberos
     self.cluster = None
+    self._use_ssl = use_ssl
     DbConnection.__init__(self, **kwargs)
 
   def clone(self, db_name):
@@ -884,7 +886,9 @@ class ImpalaConnection(DbConnection):
         password=self._password,
         database=self.db_name,
         timeout=(60 * 60),
-        auth_mechanism=('GSSAPI' if self._use_kerberos else self._NON_KERBEROS_AUTH_MECH))
+        auth_mechanism=('GSSAPI' if self._use_kerberos else self._NON_KERBEROS_AUTH_MECH),
+        kerberos_service_name=self._KERBEROS_SERVICE_NAME,
+        use_ssl=self._use_ssl)
 
 
 class HiveCursor(ImpalaCursor):
@@ -909,6 +913,7 @@ class HiveConnection(ImpalaConnection):
 
   _DB_TYPE = HIVE
   _CURSOR_CLASS = HiveCursor
+  _KERBEROS_SERVICE_NAME = 'hive'
   _NON_KERBEROS_AUTH_MECH = 'PLAIN'
 
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8b459dff/tests/stress/concurrent_select.py
----------------------------------------------------------------------
diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py
index 2283608..7aa44b7 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -1650,6 +1650,7 @@ def main():
   cli_options.add_logging_options(parser)
   cli_options.add_cluster_options(parser)
   cli_options.add_kerberos_options(parser)
+  cli_options.add_ssl_options(parser)
   parser.add_argument(
       "--runtime-info-path",
       default=os.path.join(gettempdir(), "{cm_host}_query_runtime_info.json"),
@@ -1825,7 +1826,6 @@ def main():
             query_option=query_option, value=value))
 
   cluster = cli_options.create_cluster(args)
-  cluster.is_kerberized = args.use_kerberos
   impala = cluster.impala
   if impala.find_stopped_impalads():
     impala.restart()