You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/21 22:22:08 UTC

[1/6] impala git commit: [security] test and fixes for TLS socket EINTR issues

Repository: impala
Updated Branches:
  refs/heads/2.x 78a3dd819 -> 9d7b2103f


[security] test and fixes for TLS socket EINTR issues

SSL_{read,write}() can return SSL_ERROR_WANT_{READ,WRITE}
correspondingly when signal interrupts recv()/send() calls even if
SSL_MODE_AUTO_RETRY is set in the TLS context.  To handle that
properly in Socket::Blocking{Recv,Write}() methods, return
NetworkError() with appropriate POSIX error code from
TlsSocket::{Recv,Write}().

As a by-product, this changelist fixes flakiness in TestUniqueClientIds
scenario of the ClientStressTest test and other flaky tests which failed
with errors like below:

  Bad status: IO error: Could not connect to the cluster: \
    Client connection negotiation failed: client connection to \
    IP:port: Read zero bytes on a blocking Recv() call: \
    Transferred 0 of 4 bytes

Prior to this fix, the test failure ratio observed with dist-test
for TSAN builds was about 6% in multiple 1K runs.  After the fix,
no failures observed.

Change-Id: Ibec9049186f79f1c43295e4735538ed7ba4e516e
Reviewed-on: http://gerrit.cloudera.org:8080/8462
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/9360
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/b5d28234
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/b5d28234
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/b5d28234

Branch: refs/heads/2.x
Commit: b5d28234290fefc7693e04481407f863fba3d846
Parents: 78a3dd8
Author: Todd Lipcon <to...@apache.org>
Authored: Fri Nov 3 12:39:25 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 21 01:37:23 2018 +0000

----------------------------------------------------------------------
 be/src/kudu/security/CMakeLists.txt   |   1 +
 be/src/kudu/security/tls_handshake.cc |   1 -
 be/src/kudu/security/tls_socket.cc    |   9 ++
 be/src/kudu/util/net/socket.cc        |  27 +++-
 security/tls_socket-test.cc           | 196 +++++++++++++++++++++++++++++
 5 files changed, 227 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/b5d28234/be/src/kudu/security/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/CMakeLists.txt b/be/src/kudu/security/CMakeLists.txt
index 8736e66..d3539de 100644
--- a/be/src/kudu/security/CMakeLists.txt
+++ b/be/src/kudu/security/CMakeLists.txt
@@ -122,5 +122,6 @@ if (NOT NO_TESTS)
   ADD_KUDU_TEST(crypto-test)
   ADD_KUDU_TEST(test/mini_kdc-test)
   ADD_KUDU_TEST(tls_handshake-test)
+  ADD_KUDU_TEST(tls_socket-test)
   ADD_KUDU_TEST(token-test)
 endif()

http://git-wip-us.apache.org/repos/asf/impala/blob/b5d28234/be/src/kudu/security/tls_handshake.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/tls_handshake.cc b/be/src/kudu/security/tls_handshake.cc
index fdf3bb1..e1314bc 100644
--- a/be/src/kudu/security/tls_handshake.cc
+++ b/be/src/kudu/security/tls_handshake.cc
@@ -117,7 +117,6 @@ Status TlsHandshake::Continue(const string& recv, string* send) {
     DCHECK_GE(send->size(), 0);
     return Status::OK();
   }
-  DCHECK_GT(send->size(), 0);
   return Status::Incomplete("TLS Handshake incomplete");
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/b5d28234/be/src/kudu/security/tls_socket.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/tls_socket.cc b/be/src/kudu/security/tls_socket.cc
index 7aeca31..f725a49 100644
--- a/be/src/kudu/security/tls_socket.cc
+++ b/be/src/kudu/security/tls_socket.cc
@@ -52,9 +52,14 @@ Status TlsSocket::Write(const uint8_t *buf, int32_t amt, int32_t *nwritten) {
 
   errno = 0;
   int32_t bytes_written = SSL_write(ssl_.get(), buf, amt);
+  int save_errno = errno;
   if (bytes_written <= 0) {
     auto error_code = SSL_get_error(ssl_.get(), bytes_written);
     if (error_code == SSL_ERROR_WANT_WRITE) {
+      if (save_errno != 0) {
+        return Status::NetworkError("SSL_write error",
+                                    ErrnoToString(save_errno), save_errno);
+      }
       // Socket not ready to write yet.
       *nwritten = 0;
       return Status::OK();
@@ -102,6 +107,10 @@ Status TlsSocket::Recv(uint8_t *buf, int32_t amt, int32_t *nread) {
     }
     auto error_code = SSL_get_error(ssl_.get(), bytes_read);
     if (error_code == SSL_ERROR_WANT_READ) {
+      if (save_errno != 0) {
+        return Status::NetworkError("SSL_read error",
+                                    ErrnoToString(save_errno), save_errno);
+      }
       // Nothing available to read yet.
       *nread = 0;
       return Status::OK();

http://git-wip-us.apache.org/repos/asf/impala/blob/b5d28234/be/src/kudu/util/net/socket.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/socket.cc b/be/src/kudu/util/net/socket.cc
index 60f3d4b..1603da7 100644
--- a/be/src/kudu/util/net/socket.cc
+++ b/be/src/kudu/util/net/socket.cc
@@ -27,6 +27,7 @@
 
 #include <limits>
 #include <string>
+#include <type_traits>
 
 #include <glog/logging.h>
 
@@ -55,6 +56,14 @@ DEFINE_bool_hidden(socket_inject_short_recvs, false,
 TAG_FLAG(socket_inject_short_recvs, hidden);
 TAG_FLAG(socket_inject_short_recvs, unsafe);
 
+// TODO(todd) consolidate with other copies of this!
+// Retry on EINTR for functions like read() that return -1 on error.
+#define RETRY_ON_EINTR(err, expr) do { \
+  static_assert(std::is_signed<decltype(err)>::value == true, \
+                #err " must be a signed integer"); \
+  (err) = (expr); \
+} while ((err) == -1 && errno == EINTR)
+
 namespace kudu {
 
 Socket::Socket()
@@ -339,20 +348,25 @@ Status Socket::Accept(Socket *new_conn, Sockaddr *remote, int flags) {
   if (flags & FLAG_NONBLOCKING) {
     accept_flags |= SOCK_NONBLOCK;
   }
-  new_conn->Reset(::accept4(fd_, (struct sockaddr*)&addr,
-                  &olen, accept_flags));
-  if (new_conn->GetFd() < 0) {
+  int fd = -1;
+  RETRY_ON_EINTR(fd, accept4(fd_, (struct sockaddr*)&addr,
+                             &olen, accept_flags));
+  if (fd < 0) {
     int err = errno;
     return Status::NetworkError(std::string("accept4(2) error: ") +
                                 ErrnoToString(err), Slice(), err);
   }
+  new_conn->Reset(fd);
+
 #else
-  new_conn->Reset(::accept(fd_, (struct sockaddr*)&addr, &olen));
-  if (new_conn->GetFd() < 0) {
+  int fd = -1;
+  RETRY_ON_EINTR(fd, accept(fd_, (struct sockaddr*)&addr, &olen));
+  if (fd < 0) {
     int err = errno;
     return Status::NetworkError(std::string("accept(2) error: ") +
                                 ErrnoToString(err), Slice(), err);
   }
+  new_conn->Reset(fd);
   RETURN_NOT_OK(new_conn->SetNonBlocking(flags & FLAG_NONBLOCKING));
   RETURN_NOT_OK(new_conn->SetCloseOnExec());
 #endif // defined(__linux__)
@@ -509,7 +523,8 @@ Status Socket::Recv(uint8_t *buf, int32_t amt, int32_t *nread) {
   }
 
   DCHECK_GE(fd_, 0);
-  int res = ::recv(fd_, buf, amt, 0);
+  int res;
+  RETRY_ON_EINTR(res, recv(fd_, buf, amt, 0));
   if (res <= 0) {
     if (res == 0) {
       return Status::NetworkError("Recv() got EOF from remote", Slice(), ESHUTDOWN);

http://git-wip-us.apache.org/repos/asf/impala/blob/b5d28234/security/tls_socket-test.cc
----------------------------------------------------------------------
diff --git a/security/tls_socket-test.cc b/security/tls_socket-test.cc
new file mode 100644
index 0000000..a978e68
--- /dev/null
+++ b/security/tls_socket-test.cc
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/security/tls_handshake.h"
+
+#include <pthread.h>
+
+#include <atomic>
+#include <csignal>
+#include <cstdint>
+#include <cstdlib>
+#include <cstring>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <thread>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/security/tls_context.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::string;
+using std::thread;
+using std::unique_ptr;
+
+
+namespace kudu {
+namespace security {
+
+
+class TlsSocketTest : public KuduTest {
+ public:
+  void SetUp() override {
+    KuduTest::SetUp();
+
+    ASSERT_OK(client_tls_.Init());
+    ASSERT_OK(server_tls_.Init());
+    ASSERT_OK(server_tls_.GenerateSelfSignedCertAndKey());
+  }
+
+ protected:
+  TlsContext client_tls_;
+  TlsContext server_tls_;
+};
+
+Status DoNegotiationSide(Socket* sock, TlsHandshake* tls, const char* side) {
+  tls->set_verification_mode(TlsVerificationMode::VERIFY_NONE);
+
+  bool done = false;
+  string received;
+  while (!done) {
+    string to_send;
+    Status s = tls->Continue(received, &to_send);
+    if (s.ok()) {
+      done = true;
+    } else if (!s.IsIncomplete()) {
+      RETURN_NOT_OK_PREPEND(s, "unexpected tls error");
+    }
+    if (!to_send.empty()) {
+      size_t nwritten;
+      auto deadline = MonoTime::Now() + MonoDelta::FromSeconds(10);
+      RETURN_NOT_OK_PREPEND(sock->BlockingWrite(
+          reinterpret_cast<const uint8_t*>(to_send.data()),
+          to_send.size(), &nwritten, deadline),
+                            "error sending");
+    }
+
+    if (!done) {
+      uint8_t buf[1024];
+      int32_t n = 0;
+      RETURN_NOT_OK_PREPEND(sock->Recv(buf, arraysize(buf), &n),
+                            "error receiving");
+      received = string(reinterpret_cast<char*>(&buf[0]), n);
+    }
+  }
+  LOG(INFO) << side << ": negotiation complete";
+  return Status::OK();
+}
+
+void handler(int /* signal */) {}
+
+// Test for failures to handle EINTR during TLS connection
+// negotiation and data send/receive.
+TEST_F(TlsSocketTest, TestTlsSocketInterrupted) {
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
+  Sockaddr listen_addr;
+  ASSERT_OK(listen_addr.ParseString("127.0.0.1", 0));
+  Socket listener;
+  ASSERT_OK(listener.Init(0));
+  ASSERT_OK(listener.BindAndListen(listen_addr, /*listen_queue_size=*/10));
+  ASSERT_OK(listener.GetSocketAddress(&listen_addr));
+
+  // Set up a no-op signal handler for SIGUSR2.
+  struct sigaction sa, sa_old;
+  memset(&sa, 0, sizeof(sa));
+  sa.sa_handler = &handler;
+  sigaction(SIGUSR2, &sa, &sa_old);
+  SCOPED_CLEANUP({ sigaction(SIGUSR2, &sa_old, nullptr); });
+
+  // Size is big enough to not fit into output socket buffer of default size
+  // (controlled by setsockopt() with SO_SNDBUF).
+  constexpr size_t kSize = 32 * 1024 * 1024;
+
+  pthread_t server_tid;
+  CountDownLatch server_tid_sync(1);
+  std::atomic<bool> stop { false };
+  thread server([&] {
+      server_tid = pthread_self();
+      server_tid_sync.CountDown();
+      unique_ptr<Socket> sock(new Socket());
+      Sockaddr remote;
+      CHECK_OK(listener.Accept(sock.get(), &remote, /*flags=*/0));
+
+      TlsHandshake server;
+      CHECK_OK(server_tls_.InitiateHandshake(TlsHandshakeType::SERVER, &server));
+      CHECK_OK(DoNegotiationSide(sock.get(), &server, "server"));
+      CHECK_OK(server.Finish(&sock));
+
+      CHECK_OK(sock->SetRecvTimeout(kTimeout));
+      unique_ptr<uint8_t[]> buf(new uint8_t[kSize]);
+      // An "echo" loop for kSize byte buffers.
+      while (!stop) {
+        size_t n;
+        Status s = sock->BlockingRecv(buf.get(), kSize, &n, MonoTime::Now() + kTimeout);
+        if (s.ok()) {
+          size_t written;
+          s = sock->BlockingWrite(buf.get(), n, &written, MonoTime::Now() + kTimeout);
+        }
+        if (!s.ok()) {
+          CHECK(stop) << "unexpected error: " << s.ToString();
+        }
+      }
+    });
+  SCOPED_CLEANUP({ server.join(); });
+
+  // Start a thread to send signals to the server thread.
+  thread killer([&]() {
+    server_tid_sync.Wait();
+    while (!stop) {
+      PCHECK(pthread_kill(server_tid, SIGUSR2) == 0);
+      SleepFor(MonoDelta::FromMicroseconds(rand() % 10));
+    }
+  });
+  SCOPED_CLEANUP({ killer.join(); });
+
+  unique_ptr<Socket> client_sock(new Socket());
+  ASSERT_OK(client_sock->Init(0));
+  ASSERT_OK(client_sock->Connect(listen_addr));
+
+  TlsHandshake client;
+  ASSERT_OK(client_tls_.InitiateHandshake(TlsHandshakeType::CLIENT, &client));
+  ASSERT_OK(DoNegotiationSide(client_sock.get(), &client, "client"));
+  ASSERT_OK(client.Finish(&client_sock));
+
+  unique_ptr<uint8_t[]> buf(new uint8_t[kSize]);
+  for (int i = 0; i < 10; i++) {
+    SleepFor(MonoDelta::FromMilliseconds(1));
+    size_t nwritten;
+    ASSERT_OK(client_sock->BlockingWrite(buf.get(), kSize, &nwritten,
+        MonoTime::Now() + kTimeout));
+    size_t n;
+    ASSERT_OK(client_sock->BlockingRecv(buf.get(), kSize, &n,
+        MonoTime::Now() + kTimeout));
+  }
+  stop = true;
+  ASSERT_OK(client_sock->Close());
+
+  LOG(INFO) << "client done";
+}
+
+} // namespace security
+} // namespace kudu


[2/6] impala git commit: IMPALA-5801: Clean up codegen GetType() interface

Posted by ta...@apache.org.
http://git-wip-us.apache.org/repos/asf/impala/blob/632ee044/be/src/exec/partitioned-hash-join-builder.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h
index 4123b1f..e075beb 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -270,6 +270,9 @@ class PhjBuilder : public DataSink {
     return spillable_buffer_size_ * (num_reserved_buffers - 2) + max_row_buffer_size_ * 2;
   }
 
+  /// Class name in LLVM IR.
+  static const char* LLVM_CLASS_NAME;
+
  private:
   /// Create and initialize a set of hash partitions for partitioning level 'level'.
   /// The previous hash partitions must have been cleared with ClearHashPartitions().
@@ -503,9 +506,6 @@ class PhjBuilder : public DataSink {
   /// Jitted Partition::InsertBatch() function pointers. NULL if codegen is disabled.
   InsertBatchFn insert_batch_fn_;
   InsertBatchFn insert_batch_fn_level0_;
-
-  /// Class name in LLVM IR.
-  static const char* LLVM_CLASS_NAME;
 };
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/632ee044/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index e76a9ba..99e07eb 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -1295,18 +1295,13 @@ string PartitionedHashJoinNode::NodeDebugString() const {
 // }
 Status PartitionedHashJoinNode::CodegenCreateOutputRow(
     LlvmCodeGen* codegen, llvm::Function** fn) {
-  llvm::Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME);
-  DCHECK(tuple_row_type != NULL);
-  llvm::PointerType* tuple_row_ptr_type = llvm::PointerType::get(tuple_row_type, 0);
+  llvm::PointerType* tuple_row_ptr_type = codegen->GetStructPtrType<TupleRow>();
 
-  llvm::Type* this_type = codegen->GetType(BlockingJoinNode::LLVM_CLASS_NAME);
-  DCHECK(this_type != NULL);
-  llvm::PointerType* this_ptr_type = llvm::PointerType::get(this_type, 0);
+  llvm::PointerType* this_ptr_type = codegen->GetStructPtrType<BlockingJoinNode>();
 
   // TupleRows are really just an array of pointers.  Easier to work with them
   // this way.
-  llvm::PointerType* tuple_row_working_type =
-      llvm::PointerType::get(codegen->ptr_type(), 0);
+  llvm::PointerType* tuple_row_working_type = codegen->ptr_ptr_type();
 
   // Construct function signature to match CreateOutputRow()
   LlvmCodeGen::FnPrototype prototype(codegen, "CreateOutputRow", codegen->void_type());
@@ -1331,7 +1326,7 @@ Status PartitionedHashJoinNode::CodegenCreateOutputRow(
 
   // Copy probe row
   codegen->CodegenMemcpy(&builder, out_row_arg, probe_row_arg, probe_tuple_row_size_);
-  llvm::Value* build_row_idx[] = {codegen->GetIntConstant(TYPE_INT, num_probe_tuples)};
+  llvm::Value* build_row_idx[] = {codegen->GetI32Constant(num_probe_tuples)};
   llvm::Value* build_row_dst =
       builder.CreateInBoundsGEP(out_row_arg, build_row_idx, "build_dst_ptr");
 
@@ -1354,7 +1349,7 @@ Status PartitionedHashJoinNode::CodegenCreateOutputRow(
     builder.SetInsertPoint(build_null_block);
     for (int i = 0; i < num_build_tuples; ++i) {
       llvm::Value* array_idx[] = {
-          codegen->GetIntConstant(TYPE_INT, i + num_probe_tuples)};
+          codegen->GetI32Constant(i + num_probe_tuples)};
       llvm::Value* dst =
           builder.CreateInBoundsGEP(out_row_arg, array_idx, "dst_tuple_ptr");
       builder.CreateStore(codegen->null_ptr_value(), dst);
@@ -1432,8 +1427,7 @@ Status PartitionedHashJoinNode::CodegenProcessProbeBatch(
   llvm::Value* prefetch_mode_arg = codegen->GetArgument(process_probe_batch_fn, 1);
   DCHECK_GE(prefetch_mode, TPrefetchMode::NONE);
   DCHECK_LE(prefetch_mode, TPrefetchMode::HT_BUCKET);
-  prefetch_mode_arg->replaceAllUsesWith(
-      llvm::ConstantInt::get(llvm::Type::getInt32Ty(codegen->context()), prefetch_mode));
+  prefetch_mode_arg->replaceAllUsesWith(codegen->GetI32Constant(prefetch_mode));
 
   // Codegen HashTable::Equals
   llvm::Function* probe_equals_fn;

http://git-wip-us.apache.org/repos/asf/impala/blob/632ee044/be/src/exec/text-converter.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/text-converter.cc b/be/src/exec/text-converter.cc
index ddd0d6e..9e919a3 100644
--- a/be/src/exec/text-converter.cc
+++ b/be/src/exec/text-converter.cc
@@ -133,10 +133,10 @@ Status TextConverter::CodegenWriteSlot(LlvmCodeGen* codegen,
   llvm::PointerType* tuple_ptr_type = tuple_type->getPointerTo();
 
   LlvmCodeGen::FnPrototype prototype(
-      codegen, "WriteSlot", codegen->GetType(TYPE_BOOLEAN));
+      codegen, "WriteSlot", codegen->bool_type());
   prototype.AddArgument(LlvmCodeGen::NamedVariable("tuple_arg", tuple_ptr_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("data", codegen->ptr_type()));
-  prototype.AddArgument(LlvmCodeGen::NamedVariable("len", codegen->GetType(TYPE_INT)));
+  prototype.AddArgument(LlvmCodeGen::NamedVariable("len", codegen->i32_type()));
 
   LlvmBuilder builder(codegen->context());
   llvm::Value* args[3];
@@ -161,7 +161,7 @@ Status TextConverter::CodegenWriteSlot(LlvmCodeGen* codegen,
           llvm::ArrayRef<llvm::Value*>(
               {args[1], args[2], codegen->CastPtrToLlvmPtr(codegen->ptr_type(),
                                      const_cast<char*>(null_col_val)),
-                  codegen->GetIntConstant(TYPE_INT, len)}));
+                  codegen->GetI32Constant(len)}));
     }
   } else {
     // Constant FALSE as branch condition. We rely on later optimization passes
@@ -175,7 +175,7 @@ Status TextConverter::CodegenWriteSlot(LlvmCodeGen* codegen,
     builder.SetInsertPoint(check_zero_block);
     // If len == 0 and it is not a string col, set slot to NULL
     llvm::Value* null_len =
-        builder.CreateICmpEQ(args[2], codegen->GetIntConstant(TYPE_INT, 0));
+        builder.CreateICmpEQ(args[2], codegen->GetI32Constant(0));
     builder.CreateCondBr(null_len, set_null_block, parse_slot_block);
   }
 
@@ -193,7 +193,7 @@ Status TextConverter::CodegenWriteSlot(LlvmCodeGen* codegen,
     DCHECK(slot_desc->type().type != TYPE_CHAR);
     if (slot_desc->type().type == TYPE_VARCHAR) {
       // determine if we need to truncate the string
-      llvm::Value* maxlen = codegen->GetIntConstant(TYPE_INT, slot_desc->type().len);
+      llvm::Value* maxlen = codegen->GetI32Constant(slot_desc->type().len);
       llvm::Value* len_lt_maxlen =
           builder.CreateICmpSLT(args[2], maxlen, "len_lt_maxlen");
       llvm::Value* minlen =
@@ -260,7 +260,7 @@ Status TextConverter::CodegenWriteSlot(LlvmCodeGen* codegen,
     llvm::BasicBlock *parse_success_block, *parse_failed_block;
     codegen->CreateIfElseBlocks(*fn, "parse_success", "parse_fail",
         &parse_success_block, &parse_failed_block);
-    LlvmCodeGen::NamedVariable parse_result("parse_result", codegen->GetType(TYPE_INT));
+    LlvmCodeGen::NamedVariable parse_result("parse_result", codegen->i32_type());
     llvm::Value* parse_result_ptr = codegen->CreateEntryBlockAlloca(*fn, parse_result);
 
     llvm::CallInst* parse_return;
@@ -269,8 +269,8 @@ Status TextConverter::CodegenWriteSlot(LlvmCodeGen* codegen,
     if (slot_desc->type().type == TYPE_DECIMAL) {
       // Special case for decimal since it has additional precision/scale parameters
       parse_return = builder.CreateCall(parse_fn, {args[1], args[2],
-          codegen->GetIntConstant(TYPE_INT, slot_desc->type().precision),
-          codegen->GetIntConstant(TYPE_INT, slot_desc->type().scale), parse_result_ptr});
+          codegen->GetI32Constant(slot_desc->type().precision),
+          codegen->GetI32Constant(slot_desc->type().scale), parse_result_ptr});
     } else if (slot_desc->type().type == TYPE_TIMESTAMP) {
       // If the return value is large (more than 16 bytes in our toolchain) the first
       // parameter would be a pointer to value parsed and the return value of callee
@@ -282,7 +282,7 @@ Status TextConverter::CodegenWriteSlot(LlvmCodeGen* codegen,
     }
     llvm::Value* parse_result_val = builder.CreateLoad(parse_result_ptr, "parse_result");
     llvm::Value* failed_value =
-        codegen->GetIntConstant(TYPE_INT, StringParser::PARSE_FAILURE);
+        codegen->GetI32Constant(StringParser::PARSE_FAILURE);
 
     // Check for parse error.
     llvm::Value* parse_failed =
@@ -290,7 +290,7 @@ Status TextConverter::CodegenWriteSlot(LlvmCodeGen* codegen,
     if (strict_mode) {
       // In strict_mode, also check if parse_result is PARSE_OVERFLOW.
       llvm::Value* overflow_value =
-          codegen->GetIntConstant(TYPE_INT, StringParser::PARSE_OVERFLOW);
+          codegen->GetI32Constant(StringParser::PARSE_OVERFLOW);
       llvm::Value* parse_overflow =
           builder.CreateICmpEQ(parse_result_val, overflow_value, "overflowed");
       parse_failed = builder.CreateOr(parse_failed, parse_overflow, "failed_or");

http://git-wip-us.apache.org/repos/asf/impala/blob/632ee044/be/src/exprs/compound-predicates.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/compound-predicates.cc b/be/src/exprs/compound-predicates.cc
index 4d89cfa..55bc5c6 100644
--- a/be/src/exprs/compound-predicates.cc
+++ b/be/src/exprs/compound-predicates.cc
@@ -213,7 +213,7 @@ Status CompoundPredicate::CodegenComputeFn(
 
   // not-NULL block
   builder.SetInsertPoint(not_null_block);
-  llvm::PHINode* not_null_phi = builder.CreatePHI(codegen->GetType(TYPE_BOOLEAN), 3);
+  llvm::PHINode* not_null_phi = builder.CreatePHI(codegen->bool_type(), 3);
   if (and_fn) {
     not_null_phi->addIncoming(codegen->false_value(), lhs_null_rhs_not_null_block);
     not_null_phi->addIncoming(codegen->false_value(), lhs_not_null_rhs_null_block);
@@ -227,11 +227,11 @@ Status CompoundPredicate::CodegenComputeFn(
 
   // Ret/merge block
   builder.SetInsertPoint(ret_block);
-  llvm::PHINode* is_null_phi = builder.CreatePHI(codegen->boolean_type(), 2, "is_null");
+  llvm::PHINode* is_null_phi = builder.CreatePHI(codegen->bool_type(), 2, "is_null");
   is_null_phi->addIncoming(codegen->true_value(), null_block);
   is_null_phi->addIncoming(codegen->false_value(), not_null_block);
 
-  llvm::PHINode* val_phi = builder.CreatePHI(codegen->boolean_type(), 2, "val");
+  llvm::PHINode* val_phi = builder.CreatePHI(codegen->bool_type(), 2, "val");
   val_phi->addIncoming(codegen->false_value(), null_block);
   val_phi->addIncoming(not_null_phi, not_null_block);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/632ee044/be/src/exprs/scalar-expr.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/scalar-expr.cc b/be/src/exprs/scalar-expr.cc
index 035dad5..42c2867 100644
--- a/be/src/exprs/scalar-expr.cc
+++ b/be/src/exprs/scalar-expr.cc
@@ -367,9 +367,9 @@ llvm::Function* ScalarExpr::CreateIrFunctionPrototype(
   LlvmCodeGen::FnPrototype prototype(codegen, name, return_type);
   prototype.AddArgument(
       LlvmCodeGen::NamedVariable(
-          "eval", codegen->GetPtrType(ScalarExprEvaluator::LLVM_CLASS_NAME)));
-  prototype.AddArgument(
-      LlvmCodeGen::NamedVariable("row", codegen->GetPtrType(TupleRow::LLVM_CLASS_NAME)));
+          "eval", codegen->GetStructPtrType<ScalarExprEvaluator>()));
+  prototype.AddArgument(LlvmCodeGen::NamedVariable(
+      "row", codegen->GetStructPtrType<TupleRow>()));
   llvm::Function* function = prototype.GeneratePrototype(NULL, args[0]);
   DCHECK(function != NULL);
   return function;
@@ -389,8 +389,8 @@ Status ScalarExpr::GetCodegendComputeFnWrapper(
   llvm::BasicBlock* entry_block =
       llvm::BasicBlock::Create(codegen->context(), "entry", ir_compute_fn_);
   LlvmBuilder builder(entry_block);
-  llvm::Value* this_ptr =
-      codegen->CastPtrToLlvmPtr(codegen->GetPtrType(ScalarExpr::LLVM_CLASS_NAME), this);
+  llvm::Value* this_ptr = codegen->CastPtrToLlvmPtr(
+      codegen->GetStructPtrType<ScalarExpr>(), this);
   llvm::Value* compute_fn_args[] = {this_ptr, args[0], args[1]};
   llvm::Value* ret = CodegenAnyVal::CreateCall(
       codegen, &builder, static_getval_fn, compute_fn_args, "ret");

http://git-wip-us.apache.org/repos/asf/impala/blob/632ee044/be/src/exprs/scalar-fn-call.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/scalar-fn-call.cc b/be/src/exprs/scalar-fn-call.cc
index ffe8d27..acf6208 100644
--- a/be/src/exprs/scalar-fn-call.cc
+++ b/be/src/exprs/scalar-fn-call.cc
@@ -367,7 +367,7 @@ Status ScalarFnCall::GetCodegendComputeFn(LlvmCodeGen* codegen, llvm::Function**
       // Set 'child_fn' to the interpreted function
       child_fn = GetStaticGetValWrapper(children_[i]->type(), codegen);
       // First argument to interpreted function is children_[i]
-      llvm::Type* expr_ptr_type = codegen->GetPtrType(ScalarExpr::LLVM_CLASS_NAME);
+      llvm::Type* expr_ptr_type = codegen->GetStructPtrType<ScalarExpr>();
       child_fn_args.push_back(codegen->CastPtrToLlvmPtr(expr_ptr_type, children_[i]));
     }
     child_fn_args.push_back(eval);
@@ -400,7 +400,7 @@ Status ScalarFnCall::GetCodegendComputeFn(LlvmCodeGen* codegen, llvm::Function**
     DCHECK_EQ(udf_args.size(), vararg_start_idx_ + 1);
     DCHECK_GE(GetNumChildren(), 1);
     // Add the number of varargs
-    udf_args.push_back(codegen->GetIntConstant(TYPE_INT, NumVarArgs()));
+    udf_args.push_back(codegen->GetI32Constant(NumVarArgs()));
     // Add all the accumulated vararg inputs as one input argument.
     llvm::PointerType* vararg_type =
         CodegenAnyVal::GetUnloweredPtrType(codegen, VarArgsType());

http://git-wip-us.apache.org/repos/asf/impala/blob/632ee044/be/src/exprs/slot-ref.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/slot-ref.cc b/be/src/exprs/slot-ref.cc
index d529013..00616b3 100644
--- a/be/src/exprs/slot-ref.cc
+++ b/be/src/exprs/slot-ref.cc
@@ -188,10 +188,10 @@ Status SlotRef::GetCodegendComputeFn(LlvmCodeGen* codegen, llvm::Function** fn)
   *fn = CreateIrFunctionPrototype("GetSlotRef", codegen, &args);
   llvm::Value* row_ptr = args[1];
 
-  llvm::Value* tuple_offset = llvm::ConstantInt::get(codegen->int_type(), tuple_idx_);
-  llvm::Value* slot_offset = llvm::ConstantInt::get(codegen->int_type(), slot_offset_);
-  llvm::Value* zero = llvm::ConstantInt::get(codegen->GetType(TYPE_TINYINT), 0);
-  llvm::Value* one = llvm::ConstantInt::get(codegen->GetType(TYPE_TINYINT), 1);
+  llvm::Value* tuple_offset = codegen->GetI32Constant(tuple_idx_);
+  llvm::Value* slot_offset = codegen->GetI32Constant(slot_offset_);
+  llvm::Value* zero = codegen->GetI8Constant(0);
+  llvm::Value* one = codegen->GetI8Constant(1);
 
   llvm::BasicBlock* entry_block = llvm::BasicBlock::Create(context, "entry", *fn);
   bool slot_is_nullable = null_indicator_offset_.bit_mask != 0;
@@ -206,7 +206,7 @@ Status SlotRef::GetCodegendComputeFn(LlvmCodeGen* codegen, llvm::Function** fn)
   LlvmBuilder builder(entry_block);
   // Get the tuple offset addr from the row
   llvm::Value* cast_row_ptr = builder.CreateBitCast(
-      row_ptr, llvm::PointerType::get(codegen->ptr_type(), 0), "cast_row_ptr");
+      row_ptr, codegen->ptr_ptr_type(), "cast_row_ptr");
   llvm::Value* tuple_addr =
       builder.CreateInBoundsGEP(cast_row_ptr, tuple_offset, "tuple_addr");
   // Load the tuple*
@@ -240,8 +240,8 @@ Status SlotRef::GetCodegendComputeFn(LlvmCodeGen* codegen, llvm::Function** fn)
   // Branch for slot != NULL
   builder.SetInsertPoint(get_slot_block);
   llvm::Value* slot_ptr = builder.CreateInBoundsGEP(tuple_ptr, slot_offset, "slot_addr");
-  llvm::Value* val_ptr =
-      builder.CreateBitCast(slot_ptr, codegen->GetPtrType(type_), "val_ptr");
+  llvm::Value* val_ptr = builder.CreateBitCast(slot_ptr,
+      codegen->GetSlotPtrType(type_), "val_ptr");
   // Depending on the type, load the values we need
   llvm::Value* val = NULL;
   llvm::Value* ptr = NULL;
@@ -256,18 +256,18 @@ Status SlotRef::GetCodegendComputeFn(LlvmCodeGen* codegen, llvm::Function** fn)
   } else if (type_.type == TYPE_FIXED_UDA_INTERMEDIATE) {
     // ptr and len are the slot and its fixed length.
     ptr = builder.CreateBitCast(val_ptr, codegen->ptr_type());
-    len = codegen->GetIntConstant(TYPE_INT, type_.len);
+    len = codegen->GetI32Constant(type_.len);
   } else if (type_.type == TYPE_TIMESTAMP) {
     llvm::Value* time_of_day_ptr =
         builder.CreateStructGEP(NULL, val_ptr, 0, "time_of_day_ptr");
     // Cast boost::posix_time::time_duration to i64
     llvm::Value* time_of_day_cast =
-        builder.CreateBitCast(time_of_day_ptr, codegen->GetPtrType(TYPE_BIGINT));
+        builder.CreateBitCast(time_of_day_ptr, codegen->i64_ptr_type());
     time_of_day = builder.CreateLoad(time_of_day_cast, "time_of_day");
     llvm::Value* date_ptr = builder.CreateStructGEP(NULL, val_ptr, 1, "date_ptr");
     // Cast boost::gregorian::date to i32
     llvm::Value* date_cast =
-        builder.CreateBitCast(date_ptr, codegen->GetPtrType(TYPE_INT));
+        builder.CreateBitCast(date_ptr, codegen->i32_ptr_type());
     date = builder.CreateLoad(date_cast, "date");
   } else {
     // val_ptr is a native type
@@ -278,7 +278,7 @@ Status SlotRef::GetCodegendComputeFn(LlvmCodeGen* codegen, llvm::Function** fn)
   // Return block
   builder.SetInsertPoint(ret_block);
   llvm::PHINode* is_null_phi =
-      builder.CreatePHI(codegen->tinyint_type(), 2, "is_null_phi");
+      builder.CreatePHI(codegen->i8_type(), 2, "is_null_phi");
   if (tuple_is_nullable_) is_null_phi->addIncoming(one, entry_block);
   if (check_slot_null_indicator_block != NULL) {
     is_null_phi->addIncoming(one, check_slot_null_indicator_block);

http://git-wip-us.apache.org/repos/asf/impala/blob/632ee044/be/src/runtime/descriptors.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc
index bd67689..bc983de 100644
--- a/be/src/runtime/descriptors.cc
+++ b/be/src/runtime/descriptors.cc
@@ -76,13 +76,13 @@ string NullIndicatorOffset::DebugString() const {
 
 llvm::Constant* NullIndicatorOffset::ToIR(LlvmCodeGen* codegen) const {
   llvm::StructType* null_indicator_offset_type =
-      static_cast<llvm::StructType*>(codegen->GetType(LLVM_CLASS_NAME));
+      codegen->GetStructType<NullIndicatorOffset>();
   // Populate padding at end of struct with zeroes.
   llvm::ConstantAggregateZero* zeroes =
       llvm::ConstantAggregateZero::get(null_indicator_offset_type);
   return llvm::ConstantStruct::get(null_indicator_offset_type,
-      {llvm::ConstantInt::get(codegen->int_type(), byte_offset),
-          llvm::ConstantInt::get(codegen->tinyint_type(), bit_mask),
+      {codegen->GetI32Constant(byte_offset),
+          codegen->GetI8Constant(bit_mask),
           zeroes->getStructElement(2)});
 }
 
@@ -634,10 +634,9 @@ llvm::Value* SlotDescriptor::CodegenIsNull(LlvmCodeGen* codegen, LlvmBuilder* bu
     const NullIndicatorOffset& null_indicator_offset, llvm::Value* tuple) {
   llvm::Value* null_byte =
       CodegenGetNullByte(codegen, builder, null_indicator_offset, tuple, nullptr);
-  llvm::Constant* mask =
-      llvm::ConstantInt::get(codegen->tinyint_type(), null_indicator_offset.bit_mask);
+  llvm::Constant* mask = codegen->GetI8Constant(null_indicator_offset.bit_mask);
   llvm::Value* null_mask = builder->CreateAnd(null_byte, mask, "null_mask");
-  llvm::Constant* zero = llvm::ConstantInt::get(codegen->tinyint_type(), 0);
+  llvm::Constant* zero = codegen->GetI8Constant(0);
   return builder->CreateICmpNE(null_mask, zero, "is_null");
 }
 
@@ -653,14 +652,12 @@ llvm::Value* SlotDescriptor::CodegenIsNull(LlvmCodeGen* codegen, LlvmBuilder* bu
 void SlotDescriptor::CodegenSetNullIndicator(
     LlvmCodeGen* codegen, LlvmBuilder* builder, llvm::Value* tuple, llvm::Value* is_null)
     const {
-  DCHECK_EQ(is_null->getType(), codegen->boolean_type());
+  DCHECK_EQ(is_null->getType(), codegen->bool_type());
   llvm::Value* null_byte_ptr;
   llvm::Value* null_byte =
       CodegenGetNullByte(codegen, builder, null_indicator_offset_, tuple, &null_byte_ptr);
-  llvm::Constant* mask =
-      llvm::ConstantInt::get(codegen->tinyint_type(), null_indicator_offset_.bit_mask);
-  llvm::Constant* not_mask =
-      llvm::ConstantInt::get(codegen->tinyint_type(), ~null_indicator_offset_.bit_mask);
+  llvm::Constant* mask = codegen->GetI8Constant(null_indicator_offset_.bit_mask);
+  llvm::Constant* not_mask = codegen->GetI8Constant(~null_indicator_offset_.bit_mask);
 
   llvm::ConstantInt* constant_is_null = llvm::dyn_cast<llvm::ConstantInt>(is_null);
   llvm::Value* result = nullptr;
@@ -677,7 +674,7 @@ void SlotDescriptor::CodegenSetNullIndicator(
     llvm::Value* byte_with_cleared_bit =
         builder->CreateAnd(null_byte, not_mask, "null_bit_cleared");
     llvm::Value* sign_extended_null =
-        builder->CreateSExt(is_null, codegen->tinyint_type());
+        builder->CreateSExt(is_null, codegen->i8_type());
     llvm::Value* bit_only = builder->CreateAnd(sign_extended_null, mask, "null_bit");
     result = builder->CreateOr(byte_with_cleared_bit, bit_only, "null_bit_set");
   }
@@ -690,7 +687,7 @@ llvm::Value* SlotDescriptor::CodegenGetNullByte(
     const NullIndicatorOffset& null_indicator_offset, llvm::Value* tuple,
     llvm::Value** null_byte_ptr) {
   llvm::Constant* byte_offset =
-      llvm::ConstantInt::get(codegen->int_type(), null_indicator_offset.byte_offset);
+      codegen->GetI32Constant(null_indicator_offset.byte_offset);
   llvm::Value* tuple_bytes = builder->CreateBitCast(tuple, codegen->ptr_type());
   llvm::Value* byte_ptr =
       builder->CreateInBoundsGEP(tuple_bytes, byte_offset, "null_byte_ptr");
@@ -716,18 +713,18 @@ llvm::StructType* TupleDescriptor::GetLlvmStruct(LlvmCodeGen* codegen) const {
     if (slot->type().type == TYPE_CHAR) return nullptr;
     DCHECK_EQ(curr_struct_offset, slot->tuple_offset());
     slot->llvm_field_idx_ = struct_fields.size();
-    struct_fields.push_back(codegen->GetType(slot->type()));
+    struct_fields.push_back(codegen->GetSlotType(slot->type()));
     curr_struct_offset = slot->tuple_offset() + slot->slot_size();
   }
   // For each null byte, add a byte to the struct
   for (int i = 0; i < num_null_bytes_; ++i) {
-    struct_fields.push_back(codegen->GetType(TYPE_TINYINT));
+    struct_fields.push_back(codegen->i8_type());
     ++curr_struct_offset;
   }
 
   DCHECK_LE(curr_struct_offset, byte_size_);
   if (curr_struct_offset < byte_size_) {
-    struct_fields.push_back(llvm::ArrayType::get(codegen->GetType(TYPE_TINYINT),
+    struct_fields.push_back(llvm::ArrayType::get(codegen->i8_type(),
         byte_size_ - curr_struct_offset));
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/632ee044/be/src/runtime/tuple.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tuple.cc b/be/src/runtime/tuple.cc
index 787c3a4..627c7a4 100644
--- a/be/src/runtime/tuple.cc
+++ b/be/src/runtime/tuple.cc
@@ -330,15 +330,14 @@ Status Tuple::CodegenMaterializeExprs(LlvmCodeGen* codegen, bool collect_string_
   // void MaterializeExprs(Tuple* tuple, TupleRow* row, TupleDescriptor* desc,
   //     ScalarExprEvaluator** slot_materialize_exprs, MemPool* pool,
   //     StringValue** non_null_string_values, int* total_string_lengths)
-  llvm::PointerType* opaque_tuple_type = codegen->GetPtrType(Tuple::LLVM_CLASS_NAME);
-  llvm::PointerType* row_type = codegen->GetPtrType(TupleRow::LLVM_CLASS_NAME);
-  llvm::PointerType* desc_type = codegen->GetPtrType(TupleDescriptor::LLVM_CLASS_NAME);
+  llvm::PointerType* opaque_tuple_type = codegen->GetStructPtrType<Tuple>();
+  llvm::PointerType* row_type = codegen->GetStructPtrType<TupleRow>();
+  llvm::PointerType* desc_type = codegen->GetStructPtrType<TupleDescriptor>();
   llvm::PointerType* expr_evals_type =
-      codegen->GetPtrType(codegen->GetPtrType(ScalarExprEvaluator::LLVM_CLASS_NAME));
-  llvm::PointerType* pool_type = codegen->GetPtrType(MemPool::LLVM_CLASS_NAME);
-  llvm::PointerType* string_values_type =
-      codegen->GetPtrType(codegen->GetPtrType(StringValue::LLVM_CLASS_NAME));
-  llvm::PointerType* int_ptr_type = codegen->GetPtrType(TYPE_INT);
+      codegen->GetStructPtrPtrType<ScalarExprEvaluator>();
+  llvm::PointerType* pool_type = codegen->GetStructPtrType<MemPool>();
+  llvm::PointerType* string_values_type = codegen->GetStructPtrPtrType<StringValue>();
+  llvm::PointerType* int_ptr_type = codegen->i32_ptr_type();
   LlvmCodeGen::FnPrototype prototype(codegen, "MaterializeExprs", codegen->void_type());
   prototype.AddArgument("opaque_tuple", opaque_tuple_type);
   prototype.AddArgument("row", row_type);
@@ -401,20 +400,18 @@ Status Tuple::CodegenMaterializeExprs(LlvmCodeGen* codegen, bool collect_string_
 
 Status Tuple::CodegenCopyStrings(
     LlvmCodeGen* codegen, const TupleDescriptor& desc, llvm::Function** copy_strings_fn) {
-  llvm::PointerType* opaque_tuple_type = codegen->GetPtrType(Tuple::LLVM_CLASS_NAME);
-  llvm::PointerType* runtime_state_type =
-      codegen->GetPtrType(RuntimeState::LLVM_CLASS_NAME);
-  llvm::StructType* slot_offsets_type =
-      static_cast<llvm::StructType*>(codegen->GetType(SlotOffsets::LLVM_CLASS_NAME));
-  llvm::PointerType* pool_type = codegen->GetPtrType(MemPool::LLVM_CLASS_NAME);
-  llvm::PointerType* status_type = codegen->GetPtrType(Status::LLVM_CLASS_NAME);
+  llvm::PointerType* opaque_tuple_type = codegen->GetStructPtrType<Tuple>();
+  llvm::PointerType* runtime_state_type = codegen->GetStructPtrType<RuntimeState>();
+  llvm::StructType* slot_offsets_type = codegen->GetStructType<SlotOffsets>();
+  llvm::PointerType* pool_type = codegen->GetStructPtrType<MemPool>();
+  llvm::PointerType* status_type = codegen->GetStructPtrType<Status>();
   LlvmCodeGen::FnPrototype prototype(
-      codegen, "CopyStringsWrapper", codegen->boolean_type());
+      codegen, "CopyStringsWrapper", codegen->bool_type());
   prototype.AddArgument("opaque_tuple", opaque_tuple_type);
   prototype.AddArgument("err_ctx", codegen->ptr_type());
   prototype.AddArgument("state", runtime_state_type);
   prototype.AddArgument("slot_offsets", codegen->GetPtrType(slot_offsets_type));
-  prototype.AddArgument("num_string_slots", codegen->int_type());
+  prototype.AddArgument("num_string_slots", codegen->i32_type());
   prototype.AddArgument("pool", pool_type);
   prototype.AddArgument("status", status_type);
 
@@ -440,8 +437,7 @@ Status Tuple::CodegenCopyStrings(
   }
   llvm::Constant* constant_slot_offsets = codegen->ConstantsToGVArrayPtr(
       slot_offsets_type, slot_offset_ir_constants, "slot_offsets");
-  llvm::Constant* num_string_slots =
-      llvm::ConstantInt::get(codegen->int_type(), desc.string_slots().size());
+  llvm::Constant* num_string_slots = codegen->GetI32Constant(desc.string_slots().size());
   // Get SlotOffsets* pointer to the first element of the constant array.
   llvm::Value* constant_slot_offsets_first_element_ptr =
       builder.CreateConstGEP2_64(constant_slot_offsets, 0, 0);
@@ -460,9 +456,9 @@ Status Tuple::CodegenCopyStrings(
 
 llvm::Constant* SlotOffsets::ToIR(LlvmCodeGen* codegen) const {
   return llvm::ConstantStruct::get(
-      static_cast<llvm::StructType*>(codegen->GetType(LLVM_CLASS_NAME)),
+      codegen->GetStructType<SlotOffsets>(),
       {null_indicator_offset.ToIR(codegen),
-          llvm::ConstantInt::get(codegen->int_type(), tuple_offset)});
+          codegen->GetI32Constant(tuple_offset)});
 }
 
 template void Tuple::MaterializeExprs<false, false>(TupleRow*, const TupleDescriptor&,

http://git-wip-us.apache.org/repos/asf/impala/blob/632ee044/be/src/runtime/types.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/types.cc b/be/src/runtime/types.cc
index d2b215f..c18c66e 100644
--- a/be/src/runtime/types.cc
+++ b/be/src/runtime/types.cc
@@ -345,18 +345,16 @@ ostream& operator<<(ostream& os, const ColumnType& type) {
 
 llvm::ConstantStruct* ColumnType::ToIR(LlvmCodeGen* codegen) const {
   // ColumnType = { i32, i32, i32, i32, <vector>, <vector> }
-  llvm::StructType* column_type_type =
-      llvm::cast<llvm::StructType>(codegen->GetType(LLVM_CLASS_NAME));
+  llvm::StructType* column_type_type = codegen->GetStructType<ColumnType>();
 
   DCHECK_EQ(sizeof(type), sizeof(int32_t));
-  llvm::Constant* type_field = llvm::ConstantInt::get(codegen->int_type(), type);
+  llvm::Constant* type_field = codegen->GetI32Constant(type);
   DCHECK_EQ(sizeof(len), sizeof(int32_t));
-  llvm::Constant* len_field = llvm::ConstantInt::get(codegen->int_type(), len);
+  llvm::Constant* len_field = codegen->GetI32Constant(len);
   DCHECK_EQ(sizeof(precision), sizeof(int32_t));
-  llvm::Constant* precision_field =
-      llvm::ConstantInt::get(codegen->int_type(), precision);
+  llvm::Constant* precision_field = codegen->GetI32Constant(precision);
   DCHECK_EQ(sizeof(scale), sizeof(int32_t));
-  llvm::Constant* scale_field = llvm::ConstantInt::get(codegen->int_type(), scale);
+  llvm::Constant* scale_field = codegen->GetI32Constant(scale);
 
   // Create empty 'children' and 'field_names' vectors
   DCHECK(children.empty()) << "Nested types NYI";

http://git-wip-us.apache.org/repos/asf/impala/blob/632ee044/be/src/util/tuple-row-compare.cc
----------------------------------------------------------------------
diff --git a/be/src/util/tuple-row-compare.cc b/be/src/util/tuple-row-compare.cc
index ad92e84..5620dae 100644
--- a/be/src/util/tuple-row-compare.cc
+++ b/be/src/util/tuple-row-compare.cc
@@ -221,9 +221,9 @@ Status TupleRowComparator::CodegenCompare(LlvmCodeGen* codegen, llvm::Function**
   //     ScalarExprEvaluator** ordering_expr_evals_rhs,
   //     TupleRow* lhs, TupleRow* rhs)
   llvm::PointerType* expr_evals_type =
-      codegen->GetPtrPtrType(ScalarExprEvaluator::LLVM_CLASS_NAME);
-  llvm::PointerType* tuple_row_type = codegen->GetPtrType(TupleRow::LLVM_CLASS_NAME);
-  LlvmCodeGen::FnPrototype prototype(codegen, "Compare", codegen->int_type());
+      codegen->GetStructPtrPtrType<ScalarExprEvaluator>();
+  llvm::PointerType* tuple_row_type = codegen->GetStructPtrType<TupleRow>();
+  LlvmCodeGen::FnPrototype prototype(codegen, "Compare", codegen->i32_type());
   prototype.AddArgument("ordering_expr_evals_lhs", expr_evals_type);
   prototype.AddArgument("ordering_expr_evals_rhs", expr_evals_type);
   prototype.AddArgument("lhs", tuple_row_type);


[6/6] impala git commit: IMPALA-6269: Expose KRPC metrics on debug webpage

Posted by ta...@apache.org.
IMPALA-6269: Expose KRPC metrics on debug webpage

This change exposes KRPC metrics on the /rpcz debug web page.

This change also exposes metrics for rejected RPCs on the /metrics debug
web page. See here for an example: https://git.io/vAczm

This change also fixes a bug in PrettyPrinter::GetByteUnit(), which
previously did not work for unsigned values due to an implicit cast.

This change contains tests to check that the metrics show up in /rpcz
and /metrics and that they update as expected when executing queries.

This change is based on a change by Sailesh Mukil.

Change-Id: I7af7c1a84a5be82c979ca4ef1edf35167493be3f
Reviewed-on: http://gerrit.cloudera.org:8080/9292
Reviewed-by: Lars Volker <lv...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/9d7b2103
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/9d7b2103
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/9d7b2103

Branch: refs/heads/2.x
Commit: 9d7b2103f5aaaec7013e5ec172a96a88d83dbbb9
Parents: 546d1dd
Author: Lars Volker <lv...@cloudera.com>
Authored: Tue Jan 23 11:01:29 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 21 12:48:28 2018 +0000

----------------------------------------------------------------------
 be/src/rpc/impala-service-pool.cc         | 103 +++++++++++++--
 be/src/rpc/impala-service-pool.h          |  26 ++--
 be/src/rpc/rpc-mgr-kerberized-test.cc     |   8 +-
 be/src/rpc/rpc-mgr-test-base.h            |  28 +++--
 be/src/rpc/rpc-mgr-test.cc                |  14 ++-
 be/src/rpc/rpc-mgr.cc                     |  52 +++++++-
 be/src/rpc/rpc-mgr.h                      |  15 ++-
 be/src/rpc/rpc-trace.cc                   |  19 ++-
 be/src/rpc/rpc-trace.h                    |   3 +-
 be/src/runtime/exec-env.cc                |   3 +-
 be/src/runtime/exec-env.h                 |   4 +-
 be/src/service/impalad-main.cc            |   2 +-
 be/src/util/histogram-metric.h            |  37 ++++--
 be/src/util/pretty-printer.h              |  51 +++++---
 common/thrift/Metrics.thrift              |   3 +-
 common/thrift/metrics.json                |  10 ++
 tests/custom_cluster/test_krpc_metrics.py |  97 +++++++++++++++
 tests/webserver/test_web_pages.py         |  30 +++++
 www/rpcz.tmpl                             | 166 ++++++++++++++++++++-----
 19 files changed, 547 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/9d7b2103/be/src/rpc/impala-service-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/impala-service-pool.cc b/be/src/rpc/impala-service-pool.cc
index 35a5d6d..fb50656 100644
--- a/be/src/rpc/impala-service-pool.cc
+++ b/be/src/rpc/impala-service-pool.cc
@@ -31,30 +31,49 @@
 #include "kudu/rpc/messenger.h"
 #include "kudu/rpc/service_if.h"
 #include "kudu/rpc/service_queue.h"
+#include "kudu/util/hdr_histogram.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/status.h"
 #include "kudu/util/trace.h"
+#include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
 
 #include "common/names.h"
 #include "common/status.h"
 
-METRIC_DEFINE_histogram(server, impala_unused,
+METRIC_DEFINE_histogram(server, impala_incoming_queue_time,
     "RPC Queue Time",
     kudu::MetricUnit::kMicroseconds,
     "Number of microseconds incoming RPC requests spend in the worker queue",
     60000000LU, 3);
 
+using namespace rapidjson;
+
 namespace impala {
+// Metric key format for rpc call duration metrics.
+const string RPC_QUEUE_OVERFLOW_METRIC_KEY = "rpc.$0.rpcs_queue_overflow";
 
 ImpalaServicePool::ImpalaServicePool(const scoped_refptr<kudu::MetricEntity>& entity,
-    size_t service_queue_length, kudu::rpc::ServiceIf* service,
+    size_t service_queue_length, kudu::rpc::GeneratedServiceIf* service,
     MemTracker* service_mem_tracker)
   : service_mem_tracker_(service_mem_tracker),
     service_(service),
     service_queue_(service_queue_length),
-    unused_histogram_(METRIC_impala_unused.Instantiate(entity)) {
+    incoming_queue_time_(METRIC_impala_incoming_queue_time.Instantiate(entity)) {
   DCHECK(service_mem_tracker_ != nullptr);
+  const TMetricDef& overflow_metric_def =
+      MetricDefs::Get(RPC_QUEUE_OVERFLOW_METRIC_KEY, service_->service_name());
+  rpcs_queue_overflow_ = ExecEnv::GetInstance()->rpc_metrics()->RegisterMetric(
+      new IntCounter(overflow_metric_def, 0L));
+  // Initialize additional histograms for each method of the service.
+  // TODO: Retrieve these from KRPC once KUDU-2313 has been implemented.
+  for (const auto& method : service_->methods_by_name()) {
+    const string& method_name = method.first;
+    string payload_size_name = Substitute("$0-payload-size", method_name);
+    payload_size_histograms_[method_name].reset(new HistogramMetric(
+        MakeTMetricDef(method_name, TMetricKind::HISTOGRAM, TUnit::BYTES),
+        1024 * 1024 * 1024, 3));
+  }
 }
 
 ImpalaServicePool::~ImpalaServicePool() {
@@ -101,7 +120,7 @@ void ImpalaServicePool::RejectTooBusy(kudu::rpc::InboundCall* c) {
                  service_->service_name(),
                  c->remote_address().ToString(),
                  service_queue_.max_size());
-  rpcs_queue_overflow_.Add(1);
+  rpcs_queue_overflow_->Increment(1);
   FailAndReleaseRpc(kudu::rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY,
                     kudu::Status::ServiceUnavailable(err_msg), c);
   VLOG(1) << err_msg << " Contents of service queue:\n"
@@ -202,12 +221,11 @@ void ImpalaServicePool::RunThread() {
     }
 
     // We need to call RecordHandlingStarted() to update the InboundCall timing.
-    incoming->RecordHandlingStarted(unused_histogram_);
+    incoming->RecordHandlingStarted(incoming_queue_time_);
     ADOPT_TRACE(incoming->trace());
 
     if (UNLIKELY(incoming->ClientTimedOut())) {
       TRACE_TO(incoming->trace(), "Skipping call since client already timed out"); // NOLINT(*)
-      rpcs_timed_out_in_queue_.Add(1);
 
       // Respond as a failure, even though the client will probably ignore
       // the response anyway.
@@ -217,10 +235,13 @@ void ImpalaServicePool::RunThread() {
       continue;
     }
 
-    TRACE_TO(incoming->trace(), "Handling call"); // NOLINT(*)
+    const string& method_name = incoming->remote_method().method_name();
+    int64_t transfer_size = incoming->GetTransferSize();
+    payload_size_histograms_[method_name]->Update(transfer_size);
 
-    // Release the InboundCall pointer -- when the call is responded to,
-    // it will get deleted at that point.
+    TRACE_TO(incoming->trace(), "Handling call"); // NOLINT(*)
+    // Release the InboundCall pointer -- when the call is responded to, it will get
+    // deleted at that point.
     service_->Handle(incoming.release());
   }
 }
@@ -229,4 +250,68 @@ const string ImpalaServicePool::service_name() const {
   return service_->service_name();
 }
 
+// Render a kudu::Histogram into a human readable string representation.
+// TODO: Switch to structured JSON (IMPALA-6545).
+const string KrpcHistogramToString(const kudu::Histogram* histogram) {
+  DCHECK(histogram != nullptr);
+  DCHECK_EQ(histogram->prototype()->unit(), kudu::MetricUnit::kMicroseconds);
+  kudu::HdrHistogram snapshot(*histogram->histogram());
+  return HistogramMetric::HistogramToHumanReadable(&snapshot, TUnit::TIME_US);
+}
+
+// Expose the service pool metrics by storing them as JSON in 'value'.
+void ImpalaServicePool::ToJson(rapidjson::Value* value, rapidjson::Document* document) {
+  // Add pool metrics.
+  Value service_name_val(service_name().c_str(), document->GetAllocator());
+  value->AddMember("service_name", service_name_val, document->GetAllocator());
+  value->AddMember("queue_size", service_queue_.estimated_queue_length(),
+      document->GetAllocator());
+  value->AddMember("idle_threads", service_queue_.estimated_idle_worker_count(),
+      document->GetAllocator());
+  value->AddMember("rpcs_queue_overflow", rpcs_queue_overflow_->GetValue(),
+      document->GetAllocator());
+
+  Value mem_usage(PrettyPrinter::Print(service_mem_tracker_->consumption(),
+      TUnit::BYTES).c_str(), document->GetAllocator());
+  value->AddMember("mem_usage", mem_usage, document->GetAllocator());
+
+  Value mem_peak(PrettyPrinter::Print(service_mem_tracker_->peak_consumption(),
+      TUnit::BYTES).c_str(), document->GetAllocator());
+  value->AddMember("mem_peak", mem_peak, document->GetAllocator());
+
+  Value incoming_queue_time(KrpcHistogramToString(incoming_queue_time_.get()).c_str(),
+      document->GetAllocator());
+  value->AddMember("incoming_queue_time", incoming_queue_time,
+      document->GetAllocator());
+
+  // Add method specific metrics.
+  const kudu::rpc::GeneratedServiceIf::MethodInfoMap& method_infos =
+      service_->methods_by_name();
+  Value rpc_method_metrics(kArrayType);
+  for (const auto& method : method_infos) {
+    Value method_entry(kObjectType);
+
+    const string& method_name = method.first;
+    Value method_name_val(method_name.c_str(), document->GetAllocator());
+    method_entry.AddMember("method_name", method_name_val, document->GetAllocator());
+
+    kudu::rpc::RpcMethodInfo* method_info = method.second.get();
+    kudu::Histogram* handler_latency = method_info->handler_latency_histogram.get();
+    Value handler_latency_val(KrpcHistogramToString(handler_latency).c_str(),
+        document->GetAllocator());
+    method_entry.AddMember("handler_latency", handler_latency_val,
+        document->GetAllocator());
+
+    HistogramMetric* payload_size = payload_size_histograms_[method_name].get();
+    DCHECK(payload_size != nullptr);
+    Value payload_size_val(payload_size->ToHumanReadable().c_str(),
+        document->GetAllocator());
+    method_entry.AddMember("payload_size", payload_size_val, document->GetAllocator());
+
+    rpc_method_metrics.PushBack(method_entry, document->GetAllocator());
+  }
+  value->AddMember("rpc_method_metrics", rpc_method_metrics, document->GetAllocator());
+}
+
+
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/9d7b2103/be/src/rpc/impala-service-pool.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/impala-service-pool.h b/be/src/rpc/impala-service-pool.h
index 624e937..9d34366 100644
--- a/be/src/rpc/impala-service-pool.h
+++ b/be/src/rpc/impala-service-pool.h
@@ -46,7 +46,7 @@ class ImpalaServicePool : public kudu::rpc::RpcService {
   /// 'service_mem_tracker' is the MemTracker for tracking the memory usage of RPC
   /// payloads in the service queue.
   ImpalaServicePool(const scoped_refptr<kudu::MetricEntity>& entity,
-      size_t service_queue_length, kudu::rpc::ServiceIf* service,
+      size_t service_queue_length, kudu::rpc::GeneratedServiceIf* service,
       MemTracker* service_mem_tracker);
 
   virtual ~ImpalaServicePool();
@@ -57,13 +57,17 @@ class ImpalaServicePool : public kudu::rpc::RpcService {
   /// Shut down the queue and the thread pool.
   virtual void Shutdown();
 
-  kudu::rpc::RpcMethodInfo* LookupMethod(const kudu::rpc::RemoteMethod& method) override;
+  virtual kudu::rpc::RpcMethodInfo* LookupMethod(const kudu::rpc::RemoteMethod& method)
+    override;
 
   virtual kudu::Status
       QueueInboundCall(gscoped_ptr<kudu::rpc::InboundCall> call) OVERRIDE;
 
   const std::string service_name() const;
 
+  /// Expose the service pool metrics by storing them as JSON in 'value'.
+  void ToJson(rapidjson::Value* value, rapidjson::Document* document);
+
  private:
   void RunThread();
   void RejectTooBusy(kudu::rpc::InboundCall* c);
@@ -80,7 +84,7 @@ class ImpalaServicePool : public kudu::rpc::RpcService {
   MemTracker* const service_mem_tracker_;
 
   /// Reference to the implementation of the RPC handlers. Not owned.
-  kudu::rpc::ServiceIf* const service_;
+  kudu::rpc::GeneratedServiceIf* const service_;
 
   /// The set of service threads started to process incoming RPC calls.
   std::vector<std::unique_ptr<Thread>> threads_;
@@ -88,17 +92,15 @@ class ImpalaServicePool : public kudu::rpc::RpcService {
   /// The pending RPCs to be dequeued by the service threads.
   kudu::rpc::LifoServiceQueue service_queue_;
 
-  /// TODO: Display these metrics in the debug webpage. IMPALA-6269
-  /// Number of RPCs that timed out while waiting in the service queue.
-  AtomicInt32 rpcs_timed_out_in_queue_;
+  /// Histogram to track time spent by requests in the krpc incoming requests queue.
+  scoped_refptr<kudu::Histogram> incoming_queue_time_;
 
-  /// Number of RPCs that were rejected due to the queue being full.
-  AtomicInt32 rpcs_queue_overflow_;
+  /// Histogram for incoming request payload size for each method of this service.
+  std::unordered_map<std::string, std::unique_ptr<HistogramMetric>>
+      payload_size_histograms_;
 
-  /// Dummy histogram needed to call InboundCall::RecordHandlingStarted() to set
-  /// appropriate internal KRPC state. Unused otherwise.
-  /// TODO: Consider displaying this histogram in the debug webpage. IMPALA-6269
-  scoped_refptr<kudu::Histogram> unused_histogram_;
+  /// Number of RPCs that were rejected due to the queue being full. Not owned.
+  IntCounter* rpcs_queue_overflow_= nullptr;
 
   /// Protects against concurrent Shutdown() operations.
   /// TODO: This seems implausible given our current usage pattern.

http://git-wip-us.apache.org/repos/asf/impala/blob/9d7b2103/be/src/rpc/rpc-mgr-kerberized-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-kerberized-test.cc b/be/src/rpc/rpc-mgr-kerberized-test.cc
index 08c1971..bb4b9db 100644
--- a/be/src/rpc/rpc-mgr-kerberized-test.cc
+++ b/be/src/rpc/rpc-mgr-kerberized-test.cc
@@ -16,6 +16,7 @@
 // under the License.
 
 #include "rpc/rpc-mgr-test-base.h"
+#include "service/fe-support.h"
 
 DECLARE_string(ssl_client_ca_certificate);
 DECLARE_string(ssl_server_certificate);
@@ -27,7 +28,7 @@ static int kdc_port = GetServerPort();
 
 class RpcMgrKerberizedTest :
     public RpcMgrTestBase<testing::TestWithParam<KerberosSwitch> > {
-  virtual void SetUp() {
+  virtual void SetUp() override {
     IpAddr ip;
     ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
     string spn = Substitute("impala-test/$0", ip);
@@ -42,7 +43,7 @@ class RpcMgrKerberizedTest :
     RpcMgrTestBase::SetUp();
   }
 
-  virtual void TearDown() {
+  virtual void TearDown() override {
     ASSERT_OK(kdc_wrapper_->TearDownMiniKDC(GetParam()));
     RpcMgrTestBase::TearDown();
   }
@@ -82,7 +83,8 @@ TEST_P(RpcMgrKerberizedTest, MultipleServicesTls) {
 
 int main(int argc, char** argv) {
   ::testing::InitGoogleTest(&argc, argv);
-  impala::InitCommonRuntime(argc, argv, false, impala::TestInfo::BE_TEST);
+  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+  impala::InitFeSupport();
 
   // Fill in the path of the current binary for use by the tests.
   CURRENT_EXECUTABLE_PATH = argv[0];

http://git-wip-us.apache.org/repos/asf/impala/blob/9d7b2103/be/src/rpc/rpc-mgr-test-base.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test-base.h b/be/src/rpc/rpc-mgr-test-base.h
index 4a79040..ce063f8 100644
--- a/be/src/rpc/rpc-mgr-test-base.h
+++ b/be/src/rpc/rpc-mgr-test-base.h
@@ -26,6 +26,7 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
 #include "rpc/auth-provider.h"
+#include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
 #include "testutil/gtest-util.h"
 #include "testutil/mini-kdc-wrapper.h"
@@ -40,7 +41,7 @@
 
 #include "common/names.h"
 
-using kudu::rpc::ServiceIf;
+using kudu::rpc::GeneratedServiceIf;
 using kudu::rpc::RpcController;
 using kudu::rpc::RpcContext;
 using kudu::rpc::RpcSidecar;
@@ -132,7 +133,7 @@ template <class T> class RpcMgrTestBase : public T {
 
   // Takes over ownership of the newly created 'service' which needs to have a lifetime
   // as long as 'rpc_mgr_' as RpcMgr::Shutdown() will call Shutdown() of 'service'.
-  ServiceIf* TakeOverService(std::unique_ptr<ServiceIf> service) {
+  GeneratedServiceIf* TakeOverService(std::unique_ptr<GeneratedServiceIf> service) {
     services_.emplace_back(move(service));
     return services_.back().get();
   }
@@ -141,14 +142,15 @@ template <class T> class RpcMgrTestBase : public T {
   TNetworkAddress krpc_address_;
   RpcMgr rpc_mgr_;
 
-  virtual void SetUp() {
+  virtual void SetUp() override {
     IpAddr ip;
     ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
     krpc_address_ = MakeNetworkAddress(ip, SERVICE_PORT);
+    exec_env_.reset(new ExecEnv());
     ASSERT_OK(rpc_mgr_.Init());
   }
 
-  virtual void TearDown() {
+  virtual void TearDown() override {
     rpc_mgr_.Shutdown();
   }
 
@@ -156,7 +158,10 @@ template <class T> class RpcMgrTestBase : public T {
   int32_t payload_[PAYLOAD_SIZE];
 
   // Own all the services used by the test.
-  std::vector<std::unique_ptr<ServiceIf>> services_;
+  std::vector<std::unique_ptr<GeneratedServiceIf>> services_;
+
+  // Required to set up the RPC metric groups used by the service pool.
+  std::unique_ptr<ExecEnv> exec_env_;
 };
 
 typedef std::function<void(RpcContext*)> ServiceCB;
@@ -169,8 +174,8 @@ class PingServiceImpl : public PingServiceIf {
       ServiceCB cb = [](RpcContext* ctx) { ctx->RespondSuccess(); })
     : PingServiceIf(entity, tracker), mem_tracker_(-1, "Ping Service"), cb_(cb) {}
 
-  virtual void Ping(
-      const PingRequestPB* request, PingResponsePB* response, RpcContext* context) {
+  virtual void Ping(const PingRequestPB* request, PingResponsePB* response, RpcContext*
+      context) override {
     response->set_int_response(42);
     // Incoming requests will already be tracked and we need to release the memory.
     mem_tracker_.Release(context->GetTransferSize());
@@ -194,7 +199,7 @@ class ScanMemServiceImpl : public ScanMemServiceIf {
   // The request comes with an int 'pattern' and a payload of int array sent with
   // sidecar. Scan the array to make sure every element matches 'pattern'.
   virtual void ScanMem(const ScanMemRequestPB* request, ScanMemResponsePB* response,
-      RpcContext* context) {
+      RpcContext* context) override {
     int32_t pattern = request->pattern();
     Slice payload;
     ASSERT_OK(
@@ -228,15 +233,16 @@ template <class T>
 Status RunMultipleServicesTestTemplate(RpcMgrTestBase<T>* test_base,
     RpcMgr* rpc_mgr, const TNetworkAddress& krpc_address) {
   // Test that a service can be started, and will respond to requests.
-  ServiceIf* ping_impl = test_base->TakeOverService(make_unique<PingServiceImpl>(
+  GeneratedServiceIf* ping_impl = test_base->TakeOverService(make_unique<PingServiceImpl>(
       rpc_mgr->metric_entity(), rpc_mgr->result_tracker()));
   RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, ping_impl,
       static_cast<PingServiceImpl*>(ping_impl)->mem_tracker()));
 
   // Test that a second service, that verifies the RPC payload is not corrupted,
   // can be started.
-  ServiceIf* scan_mem_impl = test_base->TakeOverService(make_unique<ScanMemServiceImpl>(
-      rpc_mgr->metric_entity(), rpc_mgr->result_tracker()));
+  GeneratedServiceIf* scan_mem_impl = test_base->TakeOverService(
+      make_unique<ScanMemServiceImpl>(rpc_mgr->metric_entity(),
+      rpc_mgr->result_tracker()));
   RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, scan_mem_impl,
       static_cast<ScanMemServiceImpl*>(scan_mem_impl)->mem_tracker()));
 

http://git-wip-us.apache.org/repos/asf/impala/blob/9d7b2103/be/src/rpc/rpc-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test.cc b/be/src/rpc/rpc-mgr-test.cc
index 8d5312f..e90838e 100644
--- a/be/src/rpc/rpc-mgr-test.cc
+++ b/be/src/rpc/rpc-mgr-test.cc
@@ -16,8 +16,9 @@
 // under the License.
 
 #include "rpc/rpc-mgr-test-base.h"
+#include "service/fe-support.h"
 
-using kudu::rpc::ServiceIf;
+using kudu::rpc::GeneratedServiceIf;
 using kudu::rpc::RpcController;
 using kudu::rpc::RpcContext;
 using kudu::MonoDelta;
@@ -31,11 +32,11 @@ namespace impala {
 
 // For tests that do not require kerberized testing, we use RpcTest.
 class RpcMgrTest : public RpcMgrTestBase<testing::Test> {
-  virtual void SetUp() {
+  virtual void SetUp() override {
     RpcMgrTestBase::SetUp();
   }
 
-  virtual void TearDown() {
+  virtual void TearDown() override {
     RpcMgrTestBase::TearDown();
   }
 };
@@ -178,7 +179,7 @@ TEST_F(RpcMgrTest, SlowCallback) {
   // Test a service which is slow to respond and has a short queue.
   // Set a timeout on the client side. Expect either a client timeout
   // or the service queue filling up.
-  ServiceIf* ping_impl = TakeOverService(make_unique<PingServiceImpl>(
+  GeneratedServiceIf* ping_impl = TakeOverService(make_unique<PingServiceImpl>(
       rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker(), slow_cb));
   const int num_service_threads = 1;
   const int queue_size = 3;
@@ -204,7 +205,7 @@ TEST_F(RpcMgrTest, SlowCallback) {
 }
 
 TEST_F(RpcMgrTest, AsyncCall) {
-  ServiceIf* scan_mem_impl = TakeOverService(make_unique<ScanMemServiceImpl>(
+  GeneratedServiceIf* scan_mem_impl = TakeOverService(make_unique<ScanMemServiceImpl>(
       rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker()));
   ASSERT_OK(rpc_mgr_.RegisterService(10, 10, scan_mem_impl,
       static_cast<ScanMemServiceImpl*>(scan_mem_impl)->mem_tracker()));
@@ -257,7 +258,8 @@ TEST_F(RpcMgrTest, NegotiationTimeout) {
 
 int main(int argc, char** argv) {
   ::testing::InitGoogleTest(&argc, argv);
-  impala::InitCommonRuntime(argc, argv, false, impala::TestInfo::BE_TEST);
+  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+  impala::InitFeSupport();
 
   // Fill in the path of the current binary for use by the tests.
   CURRENT_EXECUTABLE_PATH = argv[0];

http://git-wip-us.apache.org/repos/asf/impala/blob/9d7b2103/be/src/rpc/rpc-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.cc b/be/src/rpc/rpc-mgr.cc
index 7e05f38..ed9614e 100644
--- a/be/src/rpc/rpc-mgr.cc
+++ b/be/src/rpc/rpc-mgr.cc
@@ -31,14 +31,19 @@
 
 #include "common/names.h"
 
+using namespace rapidjson;
+
 using kudu::HostPort;
 using kudu::MetricEntity;
 using kudu::MonoDelta;
 using kudu::rpc::AcceptorPool;
+using kudu::rpc::DumpRunningRpcsRequestPB;
+using kudu::rpc::DumpRunningRpcsResponsePB;
 using kudu::rpc::MessengerBuilder;
 using kudu::rpc::Messenger;
+using kudu::rpc::RpcConnectionPB;
 using kudu::rpc::RpcController;
-using kudu::rpc::ServiceIf;
+using kudu::rpc::GeneratedServiceIf;
 using kudu::Sockaddr;
 
 DECLARE_string(hostname);
@@ -127,7 +132,7 @@ Status RpcMgr::Init() {
 }
 
 Status RpcMgr::RegisterService(int32_t num_service_threads, int32_t service_queue_depth,
-    ServiceIf* service_ptr, MemTracker* service_mem_tracker) {
+    GeneratedServiceIf* service_ptr, MemTracker* service_mem_tracker) {
   DCHECK(is_inited()) << "Must call Init() before RegisterService()";
   DCHECK(!services_started_) << "Cannot call RegisterService() after StartServices()";
   scoped_refptr<ImpalaServicePool> service_pool = new ImpalaServicePool(
@@ -154,10 +159,9 @@ Status RpcMgr::StartServices(const TNetworkAddress& address) {
   RETURN_IF_ERROR(TNetworkAddressToSockaddr(address, &sockaddr));
 
   // Call the messenger to create an AcceptorPool for us.
-  shared_ptr<AcceptorPool> acceptor_pool;
-  KUDU_RETURN_IF_ERROR(messenger_->AddAcceptorPool(sockaddr, &acceptor_pool),
+  KUDU_RETURN_IF_ERROR(messenger_->AddAcceptorPool(sockaddr, &acceptor_pool_),
       "Failed to add acceptor pool");
-  KUDU_RETURN_IF_ERROR(acceptor_pool->Start(FLAGS_num_acceptor_threads),
+  KUDU_RETURN_IF_ERROR(acceptor_pool_->Start(FLAGS_num_acceptor_threads),
       "Acceptor pool failed to start");
   VLOG_QUERY << "Started " << FLAGS_num_acceptor_threads << " acceptor threads";
   services_started_ = true;
@@ -167,6 +171,7 @@ Status RpcMgr::StartServices(const TNetworkAddress& address) {
 void RpcMgr::Shutdown() {
   if (messenger_.get() == nullptr) return;
   for (auto service_pool : service_pools_) service_pool->Shutdown();
+  acceptor_pool_.reset();
 
   messenger_->UnregisterAllServices();
   messenger_->Shutdown();
@@ -180,4 +185,41 @@ bool RpcMgr::IsServerTooBusy(const RpcController& rpc_controller) {
       err->code() == kudu::rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY;
 }
 
+void RpcMgr::ToJson(Document* document) {
+  if (messenger_.get() == nullptr) return;
+  // Add acceptor metrics.
+  int64_t num_accepted = 0;
+  if (acceptor_pool_.get() != nullptr) {
+    num_accepted = acceptor_pool_->num_rpc_connections_accepted();
+  }
+  document->AddMember("rpc_connections_accepted", num_accepted, document->GetAllocator());
+
+  // Add messenger metrics.
+  DumpRunningRpcsResponsePB response;
+  messenger_->DumpRunningRpcs(DumpRunningRpcsRequestPB(), &response);
+
+  int64_t num_inbound_calls_in_flight = 0;
+  for (const RpcConnectionPB& conn : response.inbound_connections()) {
+    num_inbound_calls_in_flight += conn.calls_in_flight().size();
+  }
+  document->AddMember("num_inbound_calls_in_flight", num_inbound_calls_in_flight,
+      document->GetAllocator());
+
+  int64_t num_outbound_calls_in_flight = 0;
+  for (const RpcConnectionPB& conn : response.outbound_connections()) {
+    num_outbound_calls_in_flight += conn.calls_in_flight().size();
+  }
+  document->AddMember("num_outbound_calls_in_flight", num_outbound_calls_in_flight,
+      document->GetAllocator());
+
+  // Add service pool metrics
+  Value services(kArrayType);
+  for (auto service_pool : service_pools_) {
+    Value service_entry(kObjectType);
+    service_pool->ToJson(&service_entry, document);
+    services.PushBack(service_entry, document->GetAllocator());
+  }
+  document->AddMember("services", services, document->GetAllocator());
+}
+
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/9d7b2103/be/src/rpc/rpc-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.h b/be/src/rpc/rpc-mgr.h
index e87b559..c7107d2 100644
--- a/be/src/rpc/rpc-mgr.h
+++ b/be/src/rpc/rpc-mgr.h
@@ -29,7 +29,7 @@
 namespace kudu {
 namespace rpc {
 class RpcController;
-class ServiceIf;
+class GeneratedServiceIf;
 } // rpc
 } // kudu
 
@@ -95,6 +95,9 @@ namespace impala {
 /// Inbound connection set-up is handled by a small fixed-size pool of 'acceptor'
 /// threads. The number of threads that accept new TCP connection requests to the service
 /// port is configurable via FLAGS_acceptor_threads.
+///
+/// If 'use_tls' is true, then the underlying messenger is configured with the required
+/// certificates, and encryption is enabled and marked as required.
 class RpcMgr {
  public:
   RpcMgr(bool use_tls = false) : use_tls_(use_tls) {}
@@ -127,7 +130,7 @@ class RpcMgr {
   ///
   /// It is an error to call this after StartServices() has been called.
   Status RegisterService(int32_t num_service_threads, int32_t service_queue_depth,
-      kudu::rpc::ServiceIf* service_ptr, MemTracker* service_mem_tracker)
+      kudu::rpc::GeneratedServiceIf* service_ptr, MemTracker* service_mem_tracker)
       WARN_UNUSED_RESULT;
 
   /// Creates a new proxy for a remote service of type P at location 'address', and places
@@ -157,6 +160,11 @@ class RpcMgr {
 
   std::shared_ptr<kudu::rpc::Messenger> messenger() { return messenger_; }
 
+  /// Writes a JSON representation of the RpcMgr's metrics to a value named 'services' in
+  /// 'document'. It will include the number of RPCs accepted so far, the number of calls
+  /// in flight, and metrics and histograms for each service and their methods.
+  void ToJson(rapidjson::Document* document);
+
   ~RpcMgr() {
     DCHECK_EQ(service_pools_.size(), 0)
         << "Must call Shutdown() before destroying RpcMgr";
@@ -175,6 +183,9 @@ class RpcMgr {
   /// track results for idempotent RPC calls.
   const scoped_refptr<kudu::rpc::ResultTracker> tracker_;
 
+  /// Holds a reference to the acceptor pool. Shared ownership with messenger_.
+  std::shared_ptr<kudu::rpc::AcceptorPool> acceptor_pool_;
+
   /// Container for reactor threads which run event loops for RPC services, plus acceptor
   /// threads which manage connection setup. Has to be a shared_ptr as required by
   /// MessangerBuilder::Build().

http://git-wip-us.apache.org/repos/asf/impala/blob/9d7b2103/be/src/rpc/rpc-trace.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-trace.cc b/be/src/rpc/rpc-trace.cc
index f4da19b..028f397 100644
--- a/be/src/rpc/rpc-trace.cc
+++ b/be/src/rpc/rpc-trace.cc
@@ -22,6 +22,7 @@
 #include <gutil/strings/substitute.h>
 
 #include "common/logging.h"
+#include "rpc/rpc-mgr.h"
 #include "util/debug-util.h"
 #include "util/time.h"
 #include "util/webserver.h"
@@ -39,6 +40,8 @@ const string RPC_PROCESSING_TIME_DISTRIBUTION_METRIC_KEY = "rpc-method.$0.call_d
 // web-based summary page.
 class RpcEventHandlerManager {
  public:
+  RpcEventHandlerManager(RpcMgr* rpc_mgr) : rpc_mgr_(rpc_mgr) {}
+
   // Adds an event handler to the list of those tracked
   void RegisterEventHandler(RpcEventHandler* event_handler);
 
@@ -64,14 +67,19 @@ class RpcEventHandlerManager {
   // after they are started, event handlers have a lifetime equivalent to the length of
   // the process.
   vector<RpcEventHandler*> event_handlers_;
+
+  // Points to an RpcMgr. If this is not null, then its metrics will be included in the
+  // output of JsonCallback. Not owned, but the object must be guaranteed to live as long
+  // as the process lives.
+  RpcMgr* rpc_mgr_ = nullptr;
 };
 
 // Only instance of RpcEventHandlerManager
 scoped_ptr<RpcEventHandlerManager> handler_manager;
 
-void impala::InitRpcEventTracing(Webserver* webserver) {
-  handler_manager.reset(new RpcEventHandlerManager());
-  if (webserver != NULL) {
+void impala::InitRpcEventTracing(Webserver* webserver, RpcMgr* rpc_mgr) {
+  handler_manager.reset(new RpcEventHandlerManager(rpc_mgr));
+  if (webserver != nullptr) {
     Webserver::UrlCallback json = bind<void>(
         mem_fn(&RpcEventHandlerManager::JsonCallback), handler_manager.get(), _1, _2);
     webserver->RegisterUrlCallback("/rpcz", "rpcz.tmpl", json);
@@ -83,7 +91,7 @@ void impala::InitRpcEventTracing(Webserver* webserver) {
 }
 
 void RpcEventHandlerManager::RegisterEventHandler(RpcEventHandler* event_handler) {
-  DCHECK(event_handler != NULL);
+  DCHECK(event_handler != nullptr);
   lock_guard<mutex> l(lock_);
   event_handlers_.push_back(event_handler);
 }
@@ -98,6 +106,7 @@ void RpcEventHandlerManager::JsonCallback(const Webserver::ArgumentMap& args,
     servers.PushBack(server, document->GetAllocator());
   }
   document->AddMember("servers", servers, document->GetAllocator());
+  if (rpc_mgr_ != nullptr) rpc_mgr_->ToJson(document);
 }
 
 void RpcEventHandlerManager::ResetCallback(const Webserver::ArgumentMap& args,
@@ -137,7 +146,7 @@ void RpcEventHandler::ResetAll() {
 
 RpcEventHandler::RpcEventHandler(const string& server_name, MetricGroup* metrics) :
     server_name_(server_name), metrics_(metrics) {
-  if (handler_manager.get() != NULL) handler_manager->RegisterEventHandler(this);
+  if (handler_manager.get() != nullptr) handler_manager->RegisterEventHandler(this);
 }
 
 void RpcEventHandler::ToJson(Value* server, Document* document) {

http://git-wip-us.apache.org/repos/asf/impala/blob/9d7b2103/be/src/rpc/rpc-trace.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-trace.h b/be/src/rpc/rpc-trace.h
index 1bd7823..12c7395 100644
--- a/be/src/rpc/rpc-trace.h
+++ b/be/src/rpc/rpc-trace.h
@@ -31,6 +31,7 @@
 namespace impala {
 
 class MetricGroup;
+class RpcMgr;
 class Webserver;
 
 /// An RpcEventHandler is called every time an Rpc is started and completed. There is at
@@ -126,7 +127,7 @@ class RpcEventHandler : public apache::thrift::TProcessorEventHandler {
 };
 
 /// Initialises rpc event tracing, must be called before any RpcEventHandlers are created.
-void InitRpcEventTracing(Webserver* webserver);
+void InitRpcEventTracing(Webserver* webserver, RpcMgr* = nullptr);
 
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/9d7b2103/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 17b7bec..51eece5 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -67,7 +67,7 @@
 #include "common/names.h"
 
 using boost::algorithm::join;
-using kudu::rpc::ServiceIf;
+using kudu::rpc::GeneratedServiceIf;
 using namespace strings;
 
 DEFINE_bool_hidden(use_statestore, true, "Deprecated, do not use");
@@ -169,6 +169,7 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int krpc_port,
     frontend_(new Frontend()),
     async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)),
     query_exec_mgr_(new QueryExecMgr()),
+    rpc_metrics_(metrics_->GetOrCreateChildGroup("rpc")),
     enable_webserver_(FLAGS_enable_webserver && webserver_port > 0),
     backend_address_(MakeNetworkAddress(hostname, backend_port)) {
 

http://git-wip-us.apache.org/repos/asf/impala/blob/9d7b2103/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 3f050c9..7741276 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -83,7 +83,7 @@ class ExecEnv {
       int subscriber_port, int webserver_port, const std::string& statestore_host,
       int statestore_port);
 
-  /// Returns the first created exec env instance. In a normal impalad, this is
+  /// Returns the most recently created exec env instance. In a normal impalad, this is
   /// the only instance. In test setups with multiple ExecEnv's per process,
   /// we return the most recently created instance.
   static ExecEnv* GetInstance() { return exec_env_; }
@@ -123,6 +123,7 @@ class ExecEnv {
   io::DiskIoMgr* disk_io_mgr() { return disk_io_mgr_.get(); }
   Webserver* webserver() { return webserver_.get(); }
   MetricGroup* metrics() { return metrics_.get(); }
+  MetricGroup* rpc_metrics() { return rpc_metrics_; }
   MemTracker* process_mem_tracker() { return mem_tracker_.get(); }
   ThreadResourceMgr* thread_mgr() { return thread_mgr_.get(); }
   HdfsOpThreadPool* hdfs_op_thread_pool() { return hdfs_op_thread_pool_.get(); }
@@ -210,6 +211,7 @@ class ExecEnv {
 
   /// Not owned by this class
   ImpalaServer* impala_server_ = nullptr;
+  MetricGroup* rpc_metrics_ = nullptr;
 
   bool enable_webserver_;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/9d7b2103/be/src/service/impalad-main.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impalad-main.cc b/be/src/service/impalad-main.cc
index e0049d0..8b24a52 100644
--- a/be/src/service/impalad-main.cc
+++ b/be/src/service/impalad-main.cc
@@ -83,7 +83,7 @@ int ImpaladMain(int argc, char** argv) {
   ABORT_IF_ERROR(StartMemoryMaintenanceThread()); // Memory metrics are created in Init().
   ABORT_IF_ERROR(
       StartThreadInstrumentation(exec_env.metrics(), exec_env.webserver(), true));
-  InitRpcEventTracing(exec_env.webserver());
+  InitRpcEventTracing(exec_env.webserver(), exec_env.rpc_mgr());
 
   boost::shared_ptr<ImpalaServer> impala_server(new ImpalaServer(&exec_env));
   Status status =

http://git-wip-us.apache.org/repos/asf/impala/blob/9d7b2103/be/src/util/histogram-metric.h
----------------------------------------------------------------------
diff --git a/be/src/util/histogram-metric.h b/be/src/util/histogram-metric.h
index a520947..d4e09e4 100644
--- a/be/src/util/histogram-metric.h
+++ b/be/src/util/histogram-metric.h
@@ -39,7 +39,7 @@ class HistogramMetric : public Metric {
     DCHECK_EQ(TMetricKind::HISTOGRAM, def.kind);
   }
 
-  virtual void ToJson(rapidjson::Document* document, rapidjson::Value* value) {
+  virtual void ToJson(rapidjson::Document* document, rapidjson::Value* value) override {
     rapidjson::Value container(rapidjson::kObjectType);
     AddStandardFields(document, &container);
 
@@ -84,36 +84,47 @@ class HistogramMetric : public Metric {
     histogram_.reset(new HdrHistogram(highest, digits));
   }
 
-  virtual void ToLegacyJson(rapidjson::Document*) { }
+  virtual void ToLegacyJson(rapidjson::Document*) override {}
 
   const TUnit::type& unit() const { return unit_; }
 
-  virtual std::string ToHumanReadable() {
+  virtual std::string ToHumanReadable() override {
     boost::lock_guard<SpinLock> l(lock_);
+    return HistogramToHumanReadable(histogram_.get(), unit_);
+  }
+
+  /// Render a HdrHistogram into a human readable string representation. The histogram
+  /// type is a template parameter so that it accepts both Impala's and Kudu's
+  /// HdrHistogram classes.
+  template <class T>
+  static std::string HistogramToHumanReadable(T* histogram, TUnit::type unit) {
+    DCHECK(histogram != nullptr);
     std::stringstream out;
-    out << "Count: " << histogram_->TotalCount() << ", "
-        << "min / max: " << PrettyPrinter::Print(histogram_->MinValue(), unit_)
-        << " / " << PrettyPrinter::Print(histogram_->MaxValue(), unit_) << ", "
+    out << "Count: " << histogram->TotalCount() << ", "
+        << "min / max: " << PrettyPrinter::Print(histogram->MinValue(), unit)
+        << " / " << PrettyPrinter::Print(histogram->MaxValue(), unit) << ", "
         << "25th %-ile: "
-        << PrettyPrinter::Print(histogram_->ValueAtPercentile(25), unit_) << ", "
+        << PrettyPrinter::Print(histogram->ValueAtPercentile(25), unit) << ", "
         << "50th %-ile: "
-        << PrettyPrinter::Print(histogram_->ValueAtPercentile(50), unit_) << ", "
+        << PrettyPrinter::Print(histogram->ValueAtPercentile(50), unit) << ", "
         << "75th %-ile: "
-        << PrettyPrinter::Print(histogram_->ValueAtPercentile(75), unit_) << ", "
+        << PrettyPrinter::Print(histogram->ValueAtPercentile(75), unit) << ", "
         << "90th %-ile: "
-        << PrettyPrinter::Print(histogram_->ValueAtPercentile(90), unit_) << ", "
+        << PrettyPrinter::Print(histogram->ValueAtPercentile(90), unit) << ", "
         << "95th %-ile: "
-        << PrettyPrinter::Print(histogram_->ValueAtPercentile(95), unit_) << ", "
+        << PrettyPrinter::Print(histogram->ValueAtPercentile(95), unit) << ", "
         << "99.9th %-ile: "
-        << PrettyPrinter::Print(histogram_->ValueAtPercentile(99.9), unit_);
+        << PrettyPrinter::Print(histogram->ValueAtPercentile(99.9), unit);
     return out.str();
   }
 
  private:
-  // Protects histogram_ pointer itself.
+  /// Protects histogram_ pointer itself.
   SpinLock lock_;
   boost::scoped_ptr<HdrHistogram> histogram_;
   const TUnit::type unit_;
+
+  DISALLOW_COPY_AND_ASSIGN(HistogramMetric);
 };
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/9d7b2103/be/src/util/pretty-printer.h
----------------------------------------------------------------------
diff --git a/be/src/util/pretty-printer.h b/be/src/util/pretty-printer.h
index 5e76db1..03ba99e 100644
--- a/be/src/util/pretty-printer.h
+++ b/be/src/util/pretty-printer.h
@@ -91,22 +91,12 @@ class PrettyPrinter {
       }
 
       case TUnit::TIME_NS: {
-        ss << std::setprecision(TIME_NS_PRECISION);
-        if (value >= BILLION) {
-          /// If the time is over a second, print it up to ms.
-          value /= MILLION;
-          PrintTimeMs(value, &ss);
-        } else if (value >= MILLION) {
-          /// if the time is over a ms, print it up to microsecond in the unit of ms.
-          ss << DOUBLE_TRUNCATE(static_cast<double>(value) / MILLION, TIME_NS_PRECISION)
-             << "ms";
-        } else if (value > 1000) {
-          /// if the time is over a microsecond, print it using unit microsecond
-          ss << DOUBLE_TRUNCATE(static_cast<double>(value) / 1000, TIME_NS_PRECISION)
-             << "us";
-        } else {
-          ss << DOUBLE_TRUNCATE(value, TIME_NS_PRECISION) << "ns";
-        }
+        PrintTimeNs(value, &ss);
+        break;
+      }
+
+      case TUnit::TIME_US: {
+        PrintTimeNs(value * THOUSAND, &ss);
         break;
       }
 
@@ -204,13 +194,13 @@ class PrettyPrinter {
     if (value == 0) {
       *unit = "";
       return value;
-    } else if (value >= GIGABYTE || value <= -GIGABYTE) {
+    } else if (value >= GIGABYTE || (value < 0 && value <= -GIGABYTE)) {
       *unit = "GB";
       return value / (double) GIGABYTE;
-    } else if (value >= MEGABYTE || value <= -MEGABYTE ) {
+    } else if (value >= MEGABYTE || (value < 0 && value <= -MEGABYTE)) {
       *unit = "MB";
       return value / (double) MEGABYTE;
-    } else if (value >= KILOBYTE || value <= -KILOBYTE)  {
+    } else if (value >= KILOBYTE || (value < 0 && value <= -KILOBYTE)) {
       *unit = "KB";
       return value / (double) KILOBYTE;
     } else {
@@ -247,7 +237,28 @@ class PrettyPrinter {
     return fmod(value, 1. * modulus);
   }
 
-  /// Print the value (time in ms) to ss
+  /// Pretty print the value (time in ns) to ss.
+  template <typename T>
+  static void PrintTimeNs(T value, std::stringstream* ss) {
+    *ss << std::setprecision(TIME_NS_PRECISION);
+    if (value >= BILLION) {
+      /// If the time is over a second, print it up to ms.
+      value /= MILLION;
+      PrintTimeMs(value, ss);
+    } else if (value >= MILLION) {
+      /// if the time is over a ms, print it up to microsecond in the unit of ms.
+      *ss << DOUBLE_TRUNCATE(static_cast<double>(value) / MILLION, TIME_NS_PRECISION)
+        << "ms";
+    } else if (value > THOUSAND) {
+      /// if the time is over a microsecond, print it using unit microsecond.
+      *ss << DOUBLE_TRUNCATE(static_cast<double>(value) / THOUSAND, TIME_NS_PRECISION)
+        << "us";
+    } else {
+      *ss << DOUBLE_TRUNCATE(value, TIME_NS_PRECISION) << "ns";
+    }
+  }
+
+  /// Print the value (time in ms) to ss.
   template <typename T>
   static void PrintTimeMs(T value, std::stringstream* ss) {
     DCHECK_GE(value, static_cast<T>(0));

http://git-wip-us.apache.org/repos/asf/impala/blob/9d7b2103/common/thrift/Metrics.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Metrics.thrift b/common/thrift/Metrics.thrift
index 04f5946..4f2c7f2 100644
--- a/common/thrift/Metrics.thrift
+++ b/common/thrift/Metrics.thrift
@@ -32,7 +32,8 @@ enum TUnit {
   // No units at all, may not be a numerical quantity
   NONE,
   TIME_MS,
-  TIME_S
+  TIME_S,
+  TIME_US
 }
 
 // The kind of value that a metric represents.

http://git-wip-us.apache.org/repos/asf/impala/blob/9d7b2103/common/thrift/metrics.json
----------------------------------------------------------------------
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index b457741..6328cd4 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -1635,5 +1635,15 @@
     "units": "NONE",
     "kind": "PROPERTY",
     "key": "kudu-client.version"
+  },
+  {
+    "description": "Service $0: Total number of incoming RPCs that were rejected due to overflow of the service queue.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Service $0 Incoming RPC Queue Overflows",
+    "units": "UNIT",
+    "kind": "COUNTER",
+    "key": "rpc.$0.rpcs_queue_overflow"
   }
 ]

http://git-wip-us.apache.org/repos/asf/impala/blob/9d7b2103/tests/custom_cluster/test_krpc_metrics.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_krpc_metrics.py b/tests/custom_cluster/test_krpc_metrics.py
new file mode 100644
index 0000000..ec56f41
--- /dev/null
+++ b/tests/custom_cluster/test_krpc_metrics.py
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+import pytest
+import requests
+import time
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.impala_cluster import ImpalaCluster
+from tests.common.skip import SkipIf, SkipIfBuildType
+from tests.verifiers.mem_usage_verifier import MemUsageVerifier
+
+@SkipIf.not_krpc
+class TestKrpcMetrics(CustomClusterTestSuite):
+  """Test for KRPC metrics that require special arguments during cluster startup."""
+  RPCZ_URL = 'http://localhost:25000/rpcz?json'
+  METRICS_URL = 'http://localhost:25000/metrics?json'
+  TEST_QUERY = 'select count(*) from tpch_parquet.lineitem l1 \
+      join tpch_parquet.lineitem l2 where l1.l_orderkey = l2.l_orderkey;'
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+    super(TestKrpcMetrics, cls).setup_class()
+
+  def get_debug_page(self, page_url):
+    """Returns the content of the debug page 'page_url' as json."""
+    response = requests.get(page_url)
+    assert response.status_code == requests.codes.ok
+    return json.loads(response.text)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args('-datastream_service_queue_mem_limit=1B \
+                                     -datastream_service_num_svc_threads=1')
+  def test_krpc_queue_overflow_rpcz(self, vector):
+    """Test that rejected RPCs show up on the /rpcz debug web page.
+    """
+    def get_rpc_overflows():
+      rpcz = self.get_debug_page(self.RPCZ_URL)
+      assert len(rpcz['services']) > 0
+      for s in rpcz['services']:
+        if s['service_name'] == 'impala.DataStreamService':
+          return int(s['rpcs_queue_overflow'])
+      assert False, "Could not find DataStreamService metrics"
+
+    before = get_rpc_overflows()
+    assert before == 0
+    self.client.execute(self.TEST_QUERY)
+    after = get_rpc_overflows()
+
+    assert before < after
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args('-datastream_service_queue_mem_limit=1B \
+                                     -datastream_service_num_svc_threads=1')
+  def test_krpc_queue_overflow_metrics(self, vector):
+    """Test that rejected RPCs show up on the /metrics debug web page.
+    """
+    def iter_metrics(group):
+      for m in group['metrics']:
+        yield m
+      for c in group['child_groups']:
+        for m in iter_metrics(c):
+          yield m
+
+    def get_metric(name):
+      metrics = self.get_debug_page(self.METRICS_URL)['metric_group']
+      for m in iter_metrics(metrics):
+        if m['name'] == name:
+          return int(m['value'])
+
+    metric_name = 'rpc.impala.DataStreamService.rpcs_queue_overflow'
+    before = get_metric(metric_name)
+    assert before == 0
+
+    self.client.execute(self.TEST_QUERY)
+    after = get_metric(metric_name)
+    assert before < after

http://git-wip-us.apache.org/repos/asf/impala/blob/9d7b2103/tests/webserver/test_web_pages.py
----------------------------------------------------------------------
diff --git a/tests/webserver/test_web_pages.py b/tests/webserver/test_web_pages.py
index 8dd17a4..c736cb3 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
+from tests.common.skip import SkipIf
 from tests.common.impala_cluster import ImpalaCluster
 from tests.common.impala_test_suite import ImpalaTestSuite
 import json
@@ -32,6 +33,7 @@ class TestWebPage(ImpalaTestSuite):
   TABLE_METRICS_URL = "http://localhost:{0}/table_metrics"
   QUERY_BACKENDS_URL = "http://localhost:{0}/query_backends"
   QUERY_FINSTANCES_URL = "http://localhost:{0}/query_finstances"
+  RPCZ_URL = "http://localhost:{0}/rpcz"
   THREAD_GROUP_URL = "http://localhost:{0}/thread-group"
   # log4j changes do not apply to the statestore since it doesn't
   # have an embedded JVM. So we make two sets of ports to test the
@@ -72,6 +74,11 @@ class TestWebPage(ImpalaTestSuite):
           and string_to_search in response.text, "Offending url: " + input_url
     return response.text
 
+  def get_debug_page(self, page_url):
+    """Returns the content of the debug page 'page_url' as json."""
+    response = self.get_and_check_status(page_url + "?json", ports_to_test=[25000])
+    return json.loads(response)
+
   def get_and_check_status_jvm(self, url, string_to_search = ""):
     """Calls get_and_check_status() for impalad and catalogd only"""
     return self.get_and_check_status(url, string_to_search,
@@ -229,3 +236,26 @@ class TestWebPage(ImpalaTestSuite):
     for pattern in expected_name_patterns:
       assert any(pattern in t for t in thread_names), \
            "Could not find thread matching '%s'" % pattern
+
+  @SkipIf.not_krpc
+  def test_krpc_rpcz(self):
+    """Test that KRPC metrics are exposed in /rpcz and that they are updated when
+    executing a query."""
+    TEST_QUERY = "select count(c2.string_col) from \
+        functional.alltypestiny join functional.alltypessmall c2"
+    SVC_NAME = 'impala.DataStreamService'
+
+    def get_svc_metrics(svc_name):
+      rpcz = self.get_debug_page(self.RPCZ_URL)
+      assert len(rpcz['services']) > 0
+      for s in rpcz['services']:
+        if s['service_name'] == svc_name:
+          assert len(s['rpc_method_metrics']) > 0, '%s metrics are empty' % svc_name
+          return sorted(s['rpc_method_metrics'], key=lambda m: m['method_name'])
+      assert False, 'Could not find metrics for %s' % svc_name
+
+    before = get_svc_metrics(SVC_NAME)
+    self.client.execute(TEST_QUERY)
+    after = get_svc_metrics(SVC_NAME)
+
+    assert before != after

http://git-wip-us.apache.org/repos/asf/impala/blob/9d7b2103/www/rpcz.tmpl
----------------------------------------------------------------------
diff --git a/www/rpcz.tmpl b/www/rpcz.tmpl
index e62e662..8ade9ef 100644
--- a/www/rpcz.tmpl
+++ b/www/rpcz.tmpl
@@ -18,11 +18,7 @@ under the License.
 -->
 {{> www/common-header.tmpl }}
 
-<h2>RPC durations
-  <button class="btn btn-warning btn-xs" onClick="reset_all();">
-    Reset all
-  </button>
-</h2>
+<h2>RPC durations</h2>
 
 <p class="lead">This page shows the durations of all RPCs served by this
  <samp>{{__common__.process-name}}</samp> process.
@@ -33,6 +29,70 @@ under the License.
   <span id="refresh_on">Auto-refresh on</span>
 </label>  Last updated: <span id="last-updated"></span>
 
+{{?services}}
+<h2>KRPC Services</h2>
+{{/services}}
+{{#services}}
+<h3><samp>{{service_name}}</samp></h3>
+<table class="table table-hover table-bordered" id="{{service_name}}_metrics">
+  <tbody>
+  <tr>
+    <td>
+      <table class="table table-hover">
+        <tr>
+          <th>Queue Size</th>
+          <th>Idle Threads</th>
+          <th>Current Memory Usage</th>
+          <th>Peak Memory Usage</th>
+          <th>RPCs Rejected due to Queue Overflow</th>
+        </tr>
+        <tr>
+          <td id="{{service_name}}_queue_size">{{queue_size}}</td>
+          <td id="{{service_name}}_idle_threads">{{idle_threads}}</td>
+          <td id="{{service_name}}_mem_usage">{{mem_usage}}</td>
+          <td id="{{service_name}}_mem_peak">{{mem_peak}}</td>
+          <td id="{{service_name}}_rpcs_queue_overflow">{{rpcs_queue_overflow}}</td>
+        </tr>
+      </table>
+      <table class="table table-hover">
+        <tr>
+          <th>Incoming Queueing Time</th>
+          <td id="{{service_name}}_incoming_queue_time" colspan=2>{{incoming_queue_time}}
+          </td>
+        </tr>
+      </table>
+    </td>
+  </tr>
+  <tr>
+    <td>
+    {{#rpc_method_metrics}}
+      <table class="table table-bordered table-hover">
+        <tr><td colspan=2>
+          <strong>Method: <i>{{method_name}}</i></strong>
+        </td></tr>
+        <tr>
+          <td>Handler Latency</td>
+          <td class="{{method_name}}_handler_latency">{{handler_latency}}</td>
+        </tr>
+        <tr>
+          <td>Payload Size</td>
+          <td class="{{method_name}}_payload_size">{{payload_size}}</td>
+        </tr>
+      </table>
+    {{/rpc_method_metrics}}
+    </td>
+  </tr>
+  </tbody>
+</table>
+{{/services}}
+
+{{?servers}}
+<h2>Impala RPC Services
+  <button class="btn btn-warning btn-xs" onClick="reset_all();">
+    Reset all
+  </button>
+</h2>
+{{/servers}}
 {{#servers}}
 
 <h3><samp>{{name}} </samp>
@@ -83,44 +143,84 @@ function reset_server(server) {
   xhr.send();
 }
 
+// Update all metrics for services in "servers", which use Impala's old RPC layer.
+function update_impala_services(json) {
+  for (var i = 0; i < json["servers"].length; ++i) {
+    var tbl_json = json["servers"][i];
+    var table = document.getElementById(tbl_json["name"]);
+    if (!table) continue;
+    // Delete all existing rows, stopping at 1 to save the header
+    for (var j = table.rows.length - 1; j >= 1; --j) table.deleteRow(j);
+    tbl_json["methods"].forEach(function(method) {
+      var row = table.insertRow();
+      row.insertCell().innerHTML = "<samp>" + method.name + "</samp>";
+      row.insertCell().innerHTML = method.summary;
+      row.insertCell().innerHTML = method.in_flight;
+      var reset_cell = row.insertCell();
+      reset_cell.align = "center";
+      var button = document.createElement("button");
+      button.className = "btn btn-warning btn-xs";
+      button.appendChild(document.createTextNode("Reset"));
+      button.onclick = function() { reset_method(method.server_name, method.name); }
+      reset_cell.appendChild(button);
+    });
+  }
+}
+
+// Update all krpc metrics from the "services" member of "json".
+function update_krpc_services(json) {
+  // Update each service
+  for (var i = 0; i < json["services"].length; ++i) {
+    var svc_json = json["services"][i];
+    var svc_name = svc_json["service_name"];
+    var table = document.getElementById(svc_name + "_metrics");
+
+    // Skip updates for unknown services.
+    if (!table) continue;
+
+    // Update service metrics
+    var keys = ["queue_size", "idle_threads", "mem_usage", "mem_peak",
+        "rpcs_queue_overflow", "incoming_queue_time"];
+    for (var j = 0; j < keys.length; ++j) {
+      var key = keys[j];
+      var cell = document.getElementById(svc_name + "_" + key);
+      // Skip update for unknown values.
+      if (!cell) continue;
+      cell.innerHTML = svc_json[key];
+    }
+
+    // Update metrics for individual methods.
+    var num_methods = svc_json["rpc_method_metrics"].length;
+    for (var j = 0; j < num_methods; ++j) {
+      var method_json = svc_json["rpc_method_metrics"][j];
+      var method_name = method_json["method_name"];
+      // Update all metrics for this method.
+      var keys = ["handler_latency", "payload_size"];
+      for (var l = 0; l < keys.length; ++l) {
+        var key = keys[l];
+        var cell = $(table).find("." + method_name + "_" + key)[0];
+        // Skip update for unknown values.
+        if (!cell) continue;
+        cell.innerHTML = method_json[key];
+      }
+    }
+  }
+}
+
 function refresh() {
   var xhr = new XMLHttpRequest();
   xhr.responseType = 'text';
   xhr.timeout = 60000;
   xhr.onload = function(ignored_arg) {
-    if (xhr.status != 200) {
-      return;
-    }
+    if (xhr.status != 200) return;
     var blob = xhr.response;
     json = JSON.parse(blob);
-    for (var i = 0; i < json["servers"].length; ++i) {
-      var tbl_json = json["servers"][i];
-      var table = document.getElementById(tbl_json["name"]);
-      if (!table) continue;
-      // Delete all existing rows, stopping at 1 to save the header
-      for (var j = table.rows.length - 1; j >= 1; --j) {
-        table.deleteRow(j);
-      }
-      tbl_json["methods"].forEach(function(method) {
-        var row = table.insertRow();
-        row.insertCell().innerHTML = "<samp>" + method.name + "</samp>";
-        row.insertCell().innerHTML = method.summary;
-        row.insertCell().innerHTML = method.in_flight;
-        var reset_cell = row.insertCell();
-        reset_cell.align = "center";
-        var button = document.createElement("button");
-        button.className = "btn btn-warning btn-xs";
-        button.appendChild(document.createTextNode("Reset"));
-        button.onclick = function() { reset_method(method.server_name, method.name); }
-
-        reset_cell.appendChild(button);
-      });
-    }
-
+    update_impala_services(json);
+    update_krpc_services(json);
     document.getElementById("last-updated").textContent = new Date();
   }
 
-  xhr.ontimeout = function(){ }
+  xhr.ontimeout = function() {}
   xhr.open('GET', "/rpcz?json", true);
   xhr.send();
 }


[3/6] impala git commit: IMPALA-5801: Clean up codegen GetType() interface

Posted by ta...@apache.org.
IMPALA-5801: Clean up codegen GetType() interface

Several functions that return llvm::(Pointer)Type were renamed
to make them shorter or indicate their roles more clearly. Some
additional convenience functions were created to make some common
codegen tasks simpler:

- Get(Ptr)Type functions with string parameter are renamed to
  GetNamed(Ptr)Type
- GetStruct(Ptr)Type template functions are created to make
  GetNamedType(MyStruct::LLVM_CLASS_NAME) calls simpler (some
  classes had LLVM_CLASS_NAME as private, these are changed to
  public)
- integer type convenience functions are renamed to indicate
  bit width instead of matching SQL type (e.g. int_type->i32_type)
- similar convenience functions were created for ptr to primitive
  types, int_ptr_type
- Get(Ptr)Type functions with ColumnType parameter are renamed
  to GetSlot(Ptr)Type
- GetIntConstant function is split to several functions depending
  on the type of the integer e.g. GetI32Constant

The renamed functions can be found in llvm-codegen.h/cc. Changes
in other files are mainly renamed calls with the same functionality.

Testing:
No new tests are necessary, as no functionality was changed.

Change-Id: Ib146ca21af82023b0b460f813eae3435b4b2eb77
Reviewed-on: http://gerrit.cloudera.org:8080/9063
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/632ee044
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/632ee044
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/632ee044

Branch: refs/heads/2.x
Commit: 632ee044b127803b1b270da2338cc8be121ee8b9
Parents: b5d2823
Author: Csaba Ringhofer <cs...@cloudera.com>
Authored: Thu Jan 18 21:06:54 2018 +0100
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 21 01:37:24 2018 +0000

----------------------------------------------------------------------
 be/src/benchmarks/hash-benchmark.cc          |  28 +++---
 be/src/codegen/codegen-anyval.cc             |  82 +++++++--------
 be/src/codegen/llvm-codegen-test.cc          |  25 ++---
 be/src/codegen/llvm-codegen.cc               | 117 +++++++++-------------
 be/src/codegen/llvm-codegen.h                |  79 +++++++++++----
 be/src/exec/exec-node.cc                     |  10 +-
 be/src/exec/filter-context.cc                |  20 ++--
 be/src/exec/hash-table.cc                    |  68 ++++++-------
 be/src/exec/hdfs-avro-scanner.cc             |  23 ++---
 be/src/exec/hdfs-avro-scanner.h              |   4 +-
 be/src/exec/hdfs-parquet-scanner.cc          |   8 +-
 be/src/exec/hdfs-parquet-scanner.h           |   6 +-
 be/src/exec/hdfs-scanner.cc                  |  59 +++++------
 be/src/exec/partitioned-aggregation-node.cc  |  34 +++----
 be/src/exec/partitioned-hash-join-builder.cc |  20 ++--
 be/src/exec/partitioned-hash-join-builder.h  |   6 +-
 be/src/exec/partitioned-hash-join-node.cc    |  18 ++--
 be/src/exec/text-converter.cc                |  20 ++--
 be/src/exprs/compound-predicates.cc          |   6 +-
 be/src/exprs/scalar-expr.cc                  |  10 +-
 be/src/exprs/scalar-fn-call.cc               |   4 +-
 be/src/exprs/slot-ref.cc                     |  22 ++--
 be/src/runtime/descriptors.cc                |  29 +++---
 be/src/runtime/tuple.cc                      |  38 ++++---
 be/src/runtime/types.cc                      |  12 +--
 be/src/util/tuple-row-compare.cc             |   6 +-
 26 files changed, 359 insertions(+), 395 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/632ee044/be/src/benchmarks/hash-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/hash-benchmark.cc b/be/src/benchmarks/hash-benchmark.cc
index fffbbdf..3f605c9 100644
--- a/be/src/benchmarks/hash-benchmark.cc
+++ b/be/src/benchmarks/hash-benchmark.cc
@@ -384,11 +384,11 @@ llvm::Function* CodegenCrcHash(LlvmCodeGen* codegen, bool mixed) {
   string name = mixed ? "HashMixed" : "HashInt";
   LlvmCodeGen::FnPrototype prototype(codegen, name, codegen->void_type());
   prototype.AddArgument(
-      LlvmCodeGen::NamedVariable("rows", codegen->GetType(TYPE_INT)));
+      LlvmCodeGen::NamedVariable("rows", codegen->i32_type()));
   prototype.AddArgument(
       LlvmCodeGen::NamedVariable("data", codegen->ptr_type()));
   prototype.AddArgument(
-      LlvmCodeGen::NamedVariable("results", codegen->GetPtrType(TYPE_INT)));
+      LlvmCodeGen::NamedVariable("results", codegen->i32_ptr_type()));
 
   LlvmBuilder builder(codegen->context());
   llvm::Value* args[3];
@@ -406,41 +406,41 @@ llvm::Function* CodegenCrcHash(LlvmCodeGen* codegen, bool mixed) {
 
   llvm::Value* row_size = NULL;
   if (mixed) {
-    row_size = codegen->GetIntConstant(TYPE_INT,
-      sizeof(int8_t) + sizeof(int32_t) + sizeof(int64_t) + sizeof(StringValue));
+    row_size = codegen->GetI32Constant(
+        sizeof(int8_t) + sizeof(int32_t) + sizeof(int64_t) + sizeof(StringValue));
   } else {
-    row_size = codegen->GetIntConstant(TYPE_INT, fixed_byte_size);
+    row_size = codegen->GetI32Constant(fixed_byte_size);
   }
-  llvm::Value* dummy_len = codegen->GetIntConstant(TYPE_INT, 0);
+  llvm::Value* dummy_len = codegen->GetI32Constant(0);
 
   // Check loop counter
   llvm::Value* counter_check =
-      builder.CreateICmpSGT(args[0], codegen->GetIntConstant(TYPE_INT, 0));
+      builder.CreateICmpSGT(args[0], codegen->GetI32Constant(0));
   builder.CreateCondBr(counter_check, loop_body, loop_exit);
 
   // Loop body
   builder.SetInsertPoint(loop_body);
-  llvm::PHINode* counter = builder.CreatePHI(codegen->GetType(TYPE_INT), 2, "counter");
-  counter->addIncoming(codegen->GetIntConstant(TYPE_INT, 0), loop_start);
+  llvm::PHINode* counter = builder.CreatePHI(codegen->i32_type(), 2, "counter");
+  counter->addIncoming(codegen->GetI32Constant(0), loop_start);
 
   llvm::Value* next_counter =
-      builder.CreateAdd(counter, codegen->GetIntConstant(TYPE_INT, 1));
+      builder.CreateAdd(counter, codegen->GetI32Constant(1));
   counter->addIncoming(next_counter, loop_body);
 
   // Hash the current data
   llvm::Value* offset = builder.CreateMul(counter, row_size);
   llvm::Value* data = builder.CreateGEP(args[1], offset);
 
-  llvm::Value* seed = codegen->GetIntConstant(TYPE_INT, HashUtil::FNV_SEED);
+  llvm::Value* seed = codegen->GetI32Constant(HashUtil::FNV_SEED);
   seed =
       builder.CreateCall(fixed_fn, llvm::ArrayRef<llvm::Value*>({data, dummy_len, seed}));
 
   // Get the string data
   if (mixed) {
     llvm::Value* string_data =
-        builder.CreateGEP(data, codegen->GetIntConstant(TYPE_INT, fixed_byte_size));
-    llvm::Value* string_val =
-        builder.CreateBitCast(string_data, codegen->GetPtrType(TYPE_STRING));
+        builder.CreateGEP(data, codegen->GetI32Constant(fixed_byte_size));
+    llvm::Value* string_val = builder.CreateBitCast(string_data,
+            codegen->GetSlotPtrType(TYPE_STRING));
     llvm::Value* str_ptr = builder.CreateStructGEP(NULL, string_val, 0);
     llvm::Value* str_len = builder.CreateStructGEP(NULL, string_val, 1);
     str_ptr = builder.CreateLoad(str_ptr);

http://git-wip-us.apache.org/repos/asf/impala/blob/632ee044/be/src/codegen/codegen-anyval.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/codegen-anyval.cc b/be/src/codegen/codegen-anyval.cc
index f7edec2..6575cfd 100644
--- a/be/src/codegen/codegen-anyval.cc
+++ b/be/src/codegen/codegen-anyval.cc
@@ -38,31 +38,31 @@ const char* CodegenAnyVal::LLVM_DECIMALVAL_NAME   = "struct.impala_udf::DecimalV
 llvm::Type* CodegenAnyVal::GetLoweredType(LlvmCodeGen* cg, const ColumnType& type) {
   switch(type.type) {
     case TYPE_BOOLEAN: // i16
-      return cg->smallint_type();
+      return cg->i16_type();
     case TYPE_TINYINT: // i16
-      return cg->smallint_type();
+      return cg->i16_type();
     case TYPE_SMALLINT: // i32
-      return cg->int_type();
+      return cg->i32_type();
     case TYPE_INT: // i64
-      return cg->bigint_type();
+      return cg->i64_type();
     case TYPE_BIGINT: // { i8, i64 }
-      return llvm::StructType::get(cg->tinyint_type(), cg->bigint_type(), NULL);
+      return llvm::StructType::get(cg->i8_type(), cg->i64_type(), NULL);
     case TYPE_FLOAT: // i64
-      return cg->bigint_type();
+      return cg->i64_type();
     case TYPE_DOUBLE: // { i8, double }
-      return llvm::StructType::get(cg->tinyint_type(), cg->double_type(), NULL);
+      return llvm::StructType::get(cg->i8_type(), cg->double_type(), NULL);
     case TYPE_STRING: // { i64, i8* }
     case TYPE_VARCHAR: // { i64, i8* }
     case TYPE_FIXED_UDA_INTERMEDIATE: // { i64, i8* }
-      return llvm::StructType::get(cg->bigint_type(), cg->ptr_type(), NULL);
+      return llvm::StructType::get(cg->i64_type(), cg->ptr_type(), NULL);
     case TYPE_CHAR:
       DCHECK(false) << "NYI:" << type.DebugString();
       return NULL;
     case TYPE_TIMESTAMP: // { i64, i64 }
-      return llvm::StructType::get(cg->bigint_type(), cg->bigint_type(), NULL);
+      return llvm::StructType::get(cg->i64_type(), cg->i64_type(), NULL);
     case TYPE_DECIMAL: // %"struct.impala_udf::DecimalVal" (isn't lowered)
                        // = { {i8}, [15 x i8], {i128} }
-      return cg->GetType(LLVM_DECIMALVAL_NAME);
+      return cg->GetNamedType(LLVM_DECIMALVAL_NAME);
     default:
       DCHECK(false) << "Unsupported type: " << type;
       return NULL;
@@ -78,39 +78,39 @@ llvm::Type* CodegenAnyVal::GetUnloweredType(LlvmCodeGen* cg, const ColumnType& t
   llvm::Type* result;
   switch(type.type) {
     case TYPE_BOOLEAN:
-      result = cg->GetType(LLVM_BOOLEANVAL_NAME);
+      result = cg->GetNamedType(LLVM_BOOLEANVAL_NAME);
       break;
     case TYPE_TINYINT:
-      result = cg->GetType(LLVM_TINYINTVAL_NAME);
+      result = cg->GetNamedType(LLVM_TINYINTVAL_NAME);
       break;
     case TYPE_SMALLINT:
-      result = cg->GetType(LLVM_SMALLINTVAL_NAME);
+      result = cg->GetNamedType(LLVM_SMALLINTVAL_NAME);
       break;
     case TYPE_INT:
-      result = cg->GetType(LLVM_INTVAL_NAME);
+      result = cg->GetNamedType(LLVM_INTVAL_NAME);
       break;
     case TYPE_BIGINT:
-      result = cg->GetType(LLVM_BIGINTVAL_NAME);
+      result = cg->GetNamedType(LLVM_BIGINTVAL_NAME);
       break;
     case TYPE_FLOAT:
-      result = cg->GetType(LLVM_FLOATVAL_NAME);
+      result = cg->GetNamedType(LLVM_FLOATVAL_NAME);
       break;
     case TYPE_DOUBLE:
-      result = cg->GetType(LLVM_DOUBLEVAL_NAME);
+      result = cg->GetNamedType(LLVM_DOUBLEVAL_NAME);
       break;
     case TYPE_STRING:
     case TYPE_VARCHAR:
     case TYPE_FIXED_UDA_INTERMEDIATE:
-      result = cg->GetType(LLVM_STRINGVAL_NAME);
+      result = cg->GetNamedType(LLVM_STRINGVAL_NAME);
       break;
     case TYPE_CHAR:
       DCHECK(false) << "NYI:" << type.DebugString();
       return NULL;
     case TYPE_TIMESTAMP:
-      result = cg->GetType(LLVM_TIMESTAMPVAL_NAME);
+      result = cg->GetNamedType(LLVM_TIMESTAMPVAL_NAME);
       break;
     case TYPE_DECIMAL:
-      result = cg->GetType(LLVM_DECIMALVAL_NAME);
+      result = cg->GetNamedType(LLVM_DECIMALVAL_NAME);
       break;
     default:
       DCHECK(false) << "Unsupported type: " << type;
@@ -134,7 +134,7 @@ llvm::Value* CodegenAnyVal::CreateCall(LlvmCodeGen* cg, LlvmBuilder* builder,
     llvm::Function::arg_iterator ret_arg = fn->arg_begin();
     DCHECK(ret_arg->getType()->isPointerTy());
     llvm::Type* ret_type = ret_arg->getType()->getPointerElementType();
-    DCHECK_EQ(ret_type, cg->GetType(LLVM_DECIMALVAL_NAME));
+    DCHECK_EQ(ret_type, cg->GetNamedType(LLVM_DECIMALVAL_NAME));
 
     // We need to pass a DecimalVal pointer to 'fn' that will be populated with the result
     // value. Use 'result_ptr' if specified, otherwise alloca one.
@@ -189,15 +189,15 @@ llvm::Value* CodegenAnyVal::GetIsNull(const char* name) const {
     case TYPE_DOUBLE: {
       // Lowered type is of form { i8, * }. Get the i8 value.
       llvm::Value* is_null_i8 = builder_->CreateExtractValue(value_, 0);
-      DCHECK(is_null_i8->getType() == codegen_->tinyint_type());
-      return builder_->CreateTrunc(is_null_i8, codegen_->boolean_type(), name);
+      DCHECK(is_null_i8->getType() == codegen_->i8_type());
+      return builder_->CreateTrunc(is_null_i8, codegen_->bool_type(), name);
     }
     case TYPE_DECIMAL: {
       // Lowered type is of the form { {i8}, ... }
       uint32_t idxs[] = {0, 0};
       llvm::Value* is_null_i8 = builder_->CreateExtractValue(value_, idxs);
-      DCHECK(is_null_i8->getType() == codegen_->tinyint_type());
-      return builder_->CreateTrunc(is_null_i8, codegen_->boolean_type(), name);
+      DCHECK(is_null_i8->getType() == codegen_->i8_type());
+      return builder_->CreateTrunc(is_null_i8, codegen_->bool_type(), name);
     }
     case TYPE_STRING:
     case TYPE_VARCHAR:
@@ -205,8 +205,8 @@ llvm::Value* CodegenAnyVal::GetIsNull(const char* name) const {
     case TYPE_TIMESTAMP: {
       // Lowered type is of form { i64, *}. Get the first byte of the i64 value.
       llvm::Value* v = builder_->CreateExtractValue(value_, 0);
-      DCHECK(v->getType() == codegen_->bigint_type());
-      return builder_->CreateTrunc(v, codegen_->boolean_type(), name);
+      DCHECK(v->getType() == codegen_->i64_type());
+      return builder_->CreateTrunc(v, codegen_->bool_type(), name);
     }
     case TYPE_CHAR:
       DCHECK(false) << "NYI:" << type_.DebugString();
@@ -217,7 +217,7 @@ llvm::Value* CodegenAnyVal::GetIsNull(const char* name) const {
     case TYPE_INT:
     case TYPE_FLOAT:
       // Lowered type is an integer. Get the first byte.
-      return builder_->CreateTrunc(value_, codegen_->boolean_type(), name);
+      return builder_->CreateTrunc(value_, codegen_->bool_type(), name);
     default:
       DCHECK(false);
       return NULL;
@@ -230,7 +230,7 @@ void CodegenAnyVal::SetIsNull(llvm::Value* is_null) {
     case TYPE_DOUBLE: {
       // Lowered type is of form { i8, * }. Set the i8 value to 'is_null'.
       llvm::Value* is_null_ext =
-          builder_->CreateZExt(is_null, codegen_->tinyint_type(), "is_null_ext");
+          builder_->CreateZExt(is_null, codegen_->i8_type(), "is_null_ext");
       value_ = builder_->CreateInsertValue(value_, is_null_ext, 0, name_);
       break;
     }
@@ -238,7 +238,7 @@ void CodegenAnyVal::SetIsNull(llvm::Value* is_null) {
       // Lowered type is of form { {i8}, [15 x i8], {i128} }. Set the i8 value to
       // 'is_null'.
       llvm::Value* is_null_ext =
-          builder_->CreateZExt(is_null, codegen_->tinyint_type(), "is_null_ext");
+          builder_->CreateZExt(is_null, codegen_->i8_type(), "is_null_ext");
       // Index into the {i8} struct as well as the outer struct.
       uint32_t idxs[] = {0, 0};
       value_ = builder_->CreateInsertValue(value_, is_null_ext, idxs, name_);
@@ -318,7 +318,8 @@ llvm::Value* CodegenAnyVal::GetVal(const char* name) {
       // different width int types.)
       uint32_t idxs[] = {2, 0};
       llvm::Value* val = builder_->CreateExtractValue(value_, idxs, name);
-      return builder_->CreateTrunc(val, codegen_->GetType(type_), name);
+      return builder_->CreateTrunc(val,
+          codegen_->GetSlotType(type_), name);
     }
     default:
       DCHECK(false) << "Unsupported type: " << type_;
@@ -346,7 +347,7 @@ void CodegenAnyVal::SetVal(llvm::Value* val) {
     }
     case TYPE_FLOAT:
       // Same as above, but we must cast 'val' to an integer type.
-      val = builder_->CreateBitCast(val, codegen_->int_type());
+      val = builder_->CreateBitCast(val, codegen_->i32_type());
       value_ = SetHighBits(32, val, value_, name_);
       break;
     case TYPE_BIGINT:
@@ -485,10 +486,11 @@ llvm::Value* CodegenAnyVal::GetUnloweredPtr(const string& name) const {
 void CodegenAnyVal::LoadFromNativePtr(llvm::Value* raw_val_ptr) {
   DCHECK(raw_val_ptr->getType()->isPointerTy());
   llvm::Type* raw_val_type = raw_val_ptr->getType()->getPointerElementType();
-  DCHECK_EQ(raw_val_type, codegen_->GetType(type_))
+  DCHECK_EQ(raw_val_type, codegen_->GetSlotType(type_))
       << endl
       << LlvmCodeGen::Print(raw_val_ptr) << endl
-      << type_ << " => " << LlvmCodeGen::Print(codegen_->GetType(type_));
+      << type_ << " => " << LlvmCodeGen::Print(
+          codegen_->GetSlotType(type_));
   switch (type_.type) {
     case TYPE_STRING:
     case TYPE_VARCHAR: {
@@ -501,7 +503,7 @@ void CodegenAnyVal::LoadFromNativePtr(llvm::Value* raw_val_ptr) {
     case TYPE_FIXED_UDA_INTERMEDIATE: {
       // Convert fixed-size slot to StringVal.
       SetPtr(builder_->CreateBitCast(raw_val_ptr, codegen_->ptr_type()));
-      SetLen(codegen_->GetIntConstant(TYPE_INT, type_.len));
+      SetLen(codegen_->GetI32Constant(type_.len));
       break;
     }
     case TYPE_CHAR:
@@ -544,7 +546,7 @@ void CodegenAnyVal::LoadFromNativePtr(llvm::Value* raw_val_ptr) {
 }
 
 void CodegenAnyVal::StoreToNativePtr(llvm::Value* raw_val_ptr, llvm::Value* pool_val) {
-  llvm::Type* raw_type = codegen_->GetType(type_);
+  llvm::Type* raw_type = codegen_->GetSlotType(type_);
   switch (type_.type) {
     case TYPE_STRING:
     case TYPE_VARCHAR: {
@@ -602,8 +604,8 @@ void CodegenAnyVal::StoreToNativePtr(llvm::Value* raw_val_ptr, llvm::Value* pool
 }
 
 llvm::Value* CodegenAnyVal::ToNativePtr(llvm::Value* pool_val) {
-  llvm::Value* native_ptr =
-      codegen_->CreateEntryBlockAlloca(*builder_, codegen_->GetType(type_));
+  llvm::Value* native_ptr = codegen_->CreateEntryBlockAlloca(*builder_,
+      codegen_->GetSlotType(type_));
   StoreToNativePtr(native_ptr, pool_val);
   return native_ptr;
 }
@@ -747,7 +749,7 @@ llvm::Value* CodegenAnyVal::Compare(CodegenAnyVal* other, const char* name) {
   llvm::Value* void_v2 = builder_->CreateBitCast(v2, codegen_->ptr_type());
   // Create a global constant of the values' ColumnType. It needs to be a constant
   // for constant propagation and dead code elimination in 'compare_fn'.
-  llvm::Type* col_type = codegen_->GetType(ColumnType::LLVM_CLASS_NAME);
+  llvm::Type* col_type = codegen_->GetStructType<ColumnType>();
   llvm::Constant* type_ptr =
       codegen_->ConstantToGVPtr(col_type, type_.ToIR(codegen_), "type");
   llvm::Function* compare_fn =
@@ -797,7 +799,7 @@ llvm::Value* CodegenAnyVal::GetNullVal(LlvmCodeGen* codegen, llvm::Type* val_typ
   if (val_type->isStructTy()) {
     llvm::StructType* struct_type = llvm::cast<llvm::StructType>(val_type);
     if (struct_type->getNumElements() == 3) {
-      DCHECK_EQ(val_type, codegen->GetType(LLVM_DECIMALVAL_NAME));
+      DCHECK_EQ(val_type, codegen->GetNamedType(LLVM_DECIMALVAL_NAME));
       // Return the struct { {1}, 0, 0 } (the 'is_null' byte, i.e. the first value's first
       // byte, is set to 1, the other bytes don't matter)
       llvm::StructType* anyval_struct_type =

http://git-wip-us.apache.org/repos/asf/impala/blob/632ee044/be/src/codegen/llvm-codegen-test.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen-test.cc b/be/src/codegen/llvm-codegen-test.cc
index 9069898..dbb190a 100644
--- a/be/src/codegen/llvm-codegen-test.cc
+++ b/be/src/codegen/llvm-codegen-test.cc
@@ -160,9 +160,9 @@ llvm::Function* CodegenInnerLoop(
   codegen->CodegenDebugTrace(&builder, "Jitted\n");
 
   // Store &jitted_counter as a constant.
-  llvm::Value* const_delta = llvm::ConstantInt::get(context, llvm::APInt(64, delta));
+  llvm::Value* const_delta = codegen->GetI64Constant(delta);
   llvm::Value* counter_ptr =
-      codegen->CastPtrToLlvmPtr(codegen->GetPtrType(TYPE_BIGINT), jitted_counter);
+      codegen->CastPtrToLlvmPtr(codegen->i64_ptr_type(), jitted_counter);
   llvm::Value* loaded_counter = builder.CreateLoad(counter_ptr);
   llvm::Value* incremented_value = builder.CreateAdd(loaded_counter, const_delta);
   builder.CreateStore(incremented_value, counter_ptr);
@@ -284,10 +284,11 @@ TEST_F(LlvmCodeGenTest, ReplaceFnCall) {
 //   ret i32 %len
 // }
 llvm::Function* CodegenStringTest(LlvmCodeGen* codegen) {
-  llvm::PointerType* string_val_ptr_type = codegen->GetPtrType(TYPE_STRING);
+  llvm::PointerType* string_val_ptr_type =
+      codegen->GetSlotPtrType(TYPE_STRING);
   EXPECT_TRUE(string_val_ptr_type != NULL);
 
-  LlvmCodeGen::FnPrototype prototype(codegen, "StringTest", codegen->GetType(TYPE_INT));
+  LlvmCodeGen::FnPrototype prototype(codegen, "StringTest", codegen->i32_type());
   prototype.AddArgument(LlvmCodeGen::NamedVariable("str", string_val_ptr_type));
   LlvmBuilder builder(codegen->context());
 
@@ -297,15 +298,15 @@ llvm::Function* CodegenStringTest(LlvmCodeGen* codegen) {
   // strval->ptr[0] = 'A'
   llvm::Value* str_ptr = builder.CreateStructGEP(NULL, str, 0, "str_ptr");
   llvm::Value* ptr = builder.CreateLoad(str_ptr, "ptr");
-  llvm::Value* first_char_offset[] = {codegen->GetIntConstant(TYPE_INT, 0)};
+  llvm::Value* first_char_offset[] = {codegen->GetI32Constant(0)};
   llvm::Value* first_char_ptr =
       builder.CreateGEP(ptr, first_char_offset, "first_char_ptr");
-  builder.CreateStore(codegen->GetIntConstant(TYPE_TINYINT, 'A'), first_char_ptr);
+  builder.CreateStore(codegen->GetI8Constant('A'), first_char_ptr);
 
   // Update and return old len
   llvm::Value* len_ptr = builder.CreateStructGEP(NULL, str, 1, "len_ptr");
   llvm::Value* len = builder.CreateLoad(len_ptr, "len");
-  builder.CreateStore(codegen->GetIntConstant(TYPE_INT, 1), len_ptr);
+  builder.CreateStore(codegen->GetI32Constant(1), len_ptr);
   builder.CreateRet(len);
 
   return codegen->FinalizeFunction(interop_fn);
@@ -363,7 +364,7 @@ TEST_F(LlvmCodeGenTest, MemcpyTest) {
   LlvmCodeGen::FnPrototype prototype(codegen.get(), "MemcpyTest", codegen->void_type());
   prototype.AddArgument(LlvmCodeGen::NamedVariable("dest", codegen->ptr_type()));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("src", codegen->ptr_type()));
-  prototype.AddArgument(LlvmCodeGen::NamedVariable("n", codegen->GetType(TYPE_INT)));
+  prototype.AddArgument(LlvmCodeGen::NamedVariable("n", codegen->i32_type()));
 
   LlvmBuilder builder(codegen->context());
 
@@ -412,8 +413,8 @@ TEST_F(LlvmCodeGenTest, HashTest) {
         codegen->CastPtrToLlvmPtr(codegen->ptr_type(), const_cast<char*>(data1));
     llvm::Value* llvm_data2 =
         codegen->CastPtrToLlvmPtr(codegen->ptr_type(), const_cast<char*>(data2));
-    llvm::Value* llvm_len1 = codegen->GetIntConstant(TYPE_INT, strlen(data1));
-    llvm::Value* llvm_len2 = codegen->GetIntConstant(TYPE_INT, strlen(data2));
+    llvm::Value* llvm_len1 = codegen->GetI32Constant(strlen(data1));
+    llvm::Value* llvm_len2 = codegen->GetI32Constant(strlen(data2));
 
     uint32_t expected_hash = 0;
     expected_hash = HashUtil::Hash(data1, strlen(data1), expected_hash);
@@ -423,7 +424,7 @@ TEST_F(LlvmCodeGenTest, HashTest) {
     // Create a codegen'd function that hashes all the types and returns the results.
     // The tuple/values to hash are baked into the codegen for simplicity.
     LlvmCodeGen::FnPrototype prototype(
-        codegen.get(), "HashTest", codegen->GetType(TYPE_INT));
+        codegen.get(), "HashTest", codegen->i32_type());
     LlvmBuilder builder(codegen->context());
 
     // Test both byte-size specific hash functions and the generic loop hash function
@@ -436,7 +437,7 @@ TEST_F(LlvmCodeGenTest, HashTest) {
     ASSERT_TRUE(data2_hash_fn != NULL);
     ASSERT_TRUE(generic_hash_fn != NULL);
 
-    llvm::Value* seed = codegen->GetIntConstant(TYPE_INT, 0);
+    llvm::Value* seed = codegen->GetI32Constant(0);
     seed = builder.CreateCall(
         data1_hash_fn, llvm::ArrayRef<llvm::Value*>({llvm_data1, llvm_len1, seed}));
     seed = builder.CreateCall(

http://git-wip-us.apache.org/repos/asf/impala/blob/632ee044/be/src/codegen/llvm-codegen.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen.cc b/be/src/codegen/llvm-codegen.cc
index e1a606c..892b395 100644
--- a/be/src/codegen/llvm-codegen.cc
+++ b/be/src/codegen/llvm-codegen.cc
@@ -355,10 +355,10 @@ Status LlvmCodeGen::CreateImpalaCodegen(RuntimeState* state,
   SCOPED_TIMER(codegen->prepare_module_timer_);
 
   // Get type for StringValue
-  codegen->string_value_type_ = codegen->GetType(StringValue::LLVM_CLASS_NAME);
+  codegen->string_value_type_ = codegen->GetStructType<StringValue>();
 
   // Get type for TimestampValue
-  codegen->timestamp_value_type_ = codegen->GetType(TimestampValue::LLVM_CLASS_NAME);
+  codegen->timestamp_value_type_ = codegen->GetStructType<TimestampValue>();
 
   // Verify size is correct
   const llvm::DataLayout& data_layout = codegen->execution_engine()->getDataLayout();
@@ -411,7 +411,7 @@ Status LlvmCodeGen::Init(unique_ptr<llvm::Module> module) {
   module_->setDataLayout(execution_engine_->getDataLayout());
 
   void_type_ = llvm::Type::getVoidTy(context());
-  ptr_type_ = llvm::PointerType::get(GetType(TYPE_TINYINT), 0);
+  ptr_type_ = llvm::PointerType::get(i8_type(), 0);
   true_value_ = llvm::ConstantInt::get(context(), llvm::APInt(1, true, true));
   false_value_ = llvm::ConstantInt::get(context(), llvm::APInt(1, false, true));
 
@@ -479,30 +479,30 @@ string LlvmCodeGen::GetIR(bool full_module) const {
   return str;
 }
 
-llvm::Type* LlvmCodeGen::GetType(const ColumnType& type) {
+llvm::Type* LlvmCodeGen::GetSlotType(const ColumnType& type) {
   switch (type.type) {
     case TYPE_NULL:
       return llvm::Type::getInt1Ty(context());
     case TYPE_BOOLEAN:
-      return llvm::Type::getInt1Ty(context());
+      return bool_type();
     case TYPE_TINYINT:
-      return llvm::Type::getInt8Ty(context());
+      return i8_type();
     case TYPE_SMALLINT:
-      return llvm::Type::getInt16Ty(context());
+      return i16_type();
     case TYPE_INT:
-      return llvm::Type::getInt32Ty(context());
+      return i32_type();
     case TYPE_BIGINT:
-      return llvm::Type::getInt64Ty(context());
+      return i64_type();
     case TYPE_FLOAT:
-      return llvm::Type::getFloatTy(context());
+      return float_type();
     case TYPE_DOUBLE:
-      return llvm::Type::getDoubleTy(context());
+      return double_type();
     case TYPE_STRING:
     case TYPE_VARCHAR:
       return string_value_type_;
     case TYPE_FIXED_UDA_INTERMEDIATE:
       // Represent this as an array of bytes.
-      return llvm::ArrayType::get(GetType(TYPE_TINYINT), type.len);
+      return llvm::ArrayType::get(i8_type(), type.len);
     case TYPE_CHAR:
       // IMPALA-3207: Codegen for CHAR is not yet implemented, this should not
       // be called for TYPE_CHAR.
@@ -518,18 +518,18 @@ llvm::Type* LlvmCodeGen::GetType(const ColumnType& type) {
   }
 }
 
-llvm::PointerType* LlvmCodeGen::GetPtrType(const ColumnType& type) {
-  return llvm::PointerType::get(GetType(type), 0);
+llvm::PointerType* LlvmCodeGen::GetSlotPtrType(const ColumnType& type) {
+  return llvm::PointerType::get(GetSlotType(type), 0);
 }
 
-llvm::Type* LlvmCodeGen::GetType(const string& name) {
+llvm::Type* LlvmCodeGen::GetNamedType(const string& name) {
   llvm::Type* type = module_->getTypeByName(name);
   DCHECK(type != NULL) << name;
   return type;
 }
 
-llvm::PointerType* LlvmCodeGen::GetPtrType(const string& name) {
-  llvm::Type* type = GetType(name);
+llvm::PointerType* LlvmCodeGen::GetNamedPtrType(const string& name) {
+  llvm::Type* type = GetNamedType(name);
   DCHECK(type != NULL) << name;
   return llvm::PointerType::get(type, 0);
 }
@@ -542,34 +542,17 @@ llvm::PointerType* LlvmCodeGen::GetPtrPtrType(llvm::Type* type) {
   return llvm::PointerType::get(llvm::PointerType::get(type, 0), 0);
 }
 
-llvm::PointerType* LlvmCodeGen::GetPtrPtrType(const string& name) {
-  return llvm::PointerType::get(GetPtrType(name), 0);
+llvm::PointerType* LlvmCodeGen::GetNamedPtrPtrType(const string& name) {
+  return llvm::PointerType::get(GetNamedPtrType(name), 0);
 }
 
 // Llvm doesn't let you create a PointerValue from a c-side ptr.  Instead
 // cast it to an int and then to 'type'.
 llvm::Value* LlvmCodeGen::CastPtrToLlvmPtr(llvm::Type* type, const void* ptr) {
-  llvm::Constant* const_int =
-      llvm::ConstantInt::get(llvm::Type::getInt64Ty(context()), (int64_t)ptr);
+  llvm::Constant* const_int = GetI64Constant((int64_t)ptr);
   return llvm::ConstantExpr::getIntToPtr(const_int, type);
 }
 
-llvm::Constant* LlvmCodeGen::GetIntConstant(PrimitiveType type, uint64_t val) {
-  switch (type) {
-    case TYPE_TINYINT:
-      return llvm::ConstantInt::get(context(), llvm::APInt(8, val));
-    case TYPE_SMALLINT:
-      return llvm::ConstantInt::get(context(), llvm::APInt(16, val));
-    case TYPE_INT:
-      return llvm::ConstantInt::get(context(), llvm::APInt(32, val));
-    case TYPE_BIGINT:
-      return llvm::ConstantInt::get(context(), llvm::APInt(64, val));
-    default:
-      DCHECK(false);
-      return NULL;
-  }
-}
-
 llvm::Constant* LlvmCodeGen::GetIntConstant(
     int num_bytes, uint64_t low_bits, uint64_t high_bits) {
   DCHECK_GE(num_bytes, 1);
@@ -593,7 +576,7 @@ llvm::AllocaInst* LlvmCodeGen::CreateEntryBlockAlloca(
     llvm::Function* f, const NamedVariable& var) {
   llvm::IRBuilder<> tmp(&f->getEntryBlock(), f->getEntryBlock().begin());
   llvm::AllocaInst* alloca = tmp.CreateAlloca(var.type, NULL, var.name.c_str());
-  if (var.type == GetType(CodegenAnyVal::LLVM_DECIMALVAL_NAME)) {
+  if (var.type == GetNamedType(CodegenAnyVal::LLVM_DECIMALVAL_NAME)) {
     // Generated functions may manipulate DecimalVal arguments via SIMD instructions such
     // as 'movaps' that require 16-byte memory alignment. LLVM uses 8-byte alignment by
     // default, so explicitly set the alignment for DecimalVals.
@@ -613,7 +596,7 @@ llvm::AllocaInst* LlvmCodeGen::CreateEntryBlockAlloca(const LlvmBuilder& builder
   llvm::Function* fn = builder.GetInsertBlock()->getParent();
   llvm::IRBuilder<> tmp(&fn->getEntryBlock(), fn->getEntryBlock().begin());
   llvm::AllocaInst* alloca =
-      tmp.CreateAlloca(type, GetIntConstant(TYPE_INT, num_entries), name);
+      tmp.CreateAlloca(type, GetI32Constant(num_entries), name);
   alloca->setAlignment(alignment);
   return alloca;
 }
@@ -837,7 +820,7 @@ Status LlvmCodeGen::LoadFunction(const TFunction& fn, const std::string& symbol,
     }
 
     // The "FunctionContext*" argument.
-    prototype.AddArgument("ctx", GetPtrType("class.impala_udf::FunctionContext"));
+    prototype.AddArgument("ctx", GetNamedPtrType("class.impala_udf::FunctionContext"));
 
     // The "fixed" arguments for the UDF function, followed by the variable arguments,
     // if any.
@@ -847,7 +830,7 @@ Status LlvmCodeGen::LoadFunction(const TFunction& fn, const std::string& symbol,
     }
 
     if (has_varargs) {
-      prototype.AddArgument("num_var_arg", GetType(TYPE_INT));
+      prototype.AddArgument("num_var_arg", i32_type());
       // Get the vararg type from the first vararg.
       prototype.AddArgument(
           "var_arg", CodegenAnyVal::GetUnloweredPtrType(this, arg_types[num_fixed_args]));
@@ -935,8 +918,7 @@ int LlvmCodeGen::ReplaceCallSitesWithValue(
 
 int LlvmCodeGen::ReplaceCallSitesWithBoolConst(llvm::Function* caller, bool constant,
     const string& target_name) {
-  llvm::Value* replacement =
-      llvm::ConstantInt::get(llvm::Type::getInt1Ty(context()), constant);
+  llvm::Value* replacement = GetBoolConstant(constant);
   return ReplaceCallSitesWithValue(caller, replacement, target_name);
 }
 
@@ -975,7 +957,7 @@ int LlvmCodeGen::InlineConstFnAttrs(const FunctionContext::TypeDesc& ret_type,
     int i_val = static_cast<int>(i_arg->getSExtValue());
     DCHECK(state_ != nullptr);
     // All supported constants are currently integers.
-    call_instr->replaceAllUsesWith(llvm::ConstantInt::get(GetType(TYPE_INT),
+    call_instr->replaceAllUsesWith(GetI32Constant(
         FunctionContextImpl::GetConstFnAttr(state_, ret_type, arg_types, t_val, i_val)));
     call_instr->eraseFromParent();
     ++replaced;
@@ -1247,7 +1229,7 @@ void LlvmCodeGen::DestroyModule() {
 void LlvmCodeGen::AddFunctionToJit(llvm::Function* fn, void** fn_ptr) {
   DCHECK(finalized_functions_.find(fn) != finalized_functions_.end())
       << "Attempted to add a non-finalized function to Jit: " << fn->getName().str();
-  llvm::Type* decimal_val_type = GetType(CodegenAnyVal::LLVM_DECIMALVAL_NAME);
+  llvm::Type* decimal_val_type = GetNamedType(CodegenAnyVal::LLVM_DECIMALVAL_NAME);
   if (fn->getReturnType() == decimal_val_type) {
     // Per the x86 calling convention ABI, DecimalVals should be returned via an extra
     // first DecimalVal* argument. We generate non-compliant functions that return the
@@ -1400,7 +1382,7 @@ void LlvmCodeGen::CodegenMinMax(LlvmBuilder* builder, const ColumnType& type,
 Status LlvmCodeGen::LoadIntrinsics() {
   // Load memcpy
   {
-    llvm::Type* types[] = {ptr_type(), ptr_type(), GetType(TYPE_INT)};
+    llvm::Type* types[] = {ptr_type(), ptr_type(), i32_type()};
     llvm::Function* fn =
         llvm::Intrinsic::getDeclaration(module_, llvm::Intrinsic::memcpy, types);
     if (fn == NULL) {
@@ -1440,7 +1422,7 @@ void LlvmCodeGen::CodegenMemcpy(
     LlvmBuilder* builder, llvm::Value* dst, llvm::Value* src, int size) {
   DCHECK_GE(size, 0);
   if (size == 0) return;
-  llvm::Value* size_val = GetIntConstant(TYPE_BIGINT, size);
+  llvm::Value* size_val = GetI64Constant(size);
   CodegenMemcpy(builder, dst, src, size_val);
 }
 
@@ -1456,15 +1438,14 @@ void LlvmCodeGen::CodegenMemset(
   DCHECK(dst->getType()->isPointerTy()) << Print(dst);
   DCHECK_GE(size, 0);
   if (size == 0) return;
-  llvm::Value* value_const = GetIntConstant(TYPE_TINYINT, value);
+  llvm::Value* value_const = GetI8Constant(value);
   builder->CreateMemSet(dst, value_const, size, /* no alignment */ 0);
 }
 
 void LlvmCodeGen::CodegenClearNullBits(
     LlvmBuilder* builder, llvm::Value* tuple_ptr, const TupleDescriptor& tuple_desc) {
   llvm::Value* int8_ptr = builder->CreateBitCast(tuple_ptr, ptr_type(), "int8_ptr");
-  llvm::Value* null_bytes_offset =
-      llvm::ConstantInt::get(int_type(), tuple_desc.null_bytes_offset());
+  llvm::Value* null_bytes_offset = GetI32Constant(tuple_desc.null_bytes_offset());
   llvm::Value* null_bytes_ptr =
       builder->CreateInBoundsGEP(int8_ptr, null_bytes_offset, "null_bytes_ptr");
   CodegenMemset(builder, null_bytes_ptr, 0, tuple_desc.num_null_bytes());
@@ -1475,13 +1456,13 @@ llvm::Value* LlvmCodeGen::CodegenMemPoolAllocate(LlvmBuilder* builder,
   DCHECK(pool_val != nullptr);
   DCHECK(size_val->getType()->isIntegerTy());
   DCHECK_LE(size_val->getType()->getIntegerBitWidth(), 64);
-  DCHECK_EQ(pool_val->getType(), GetPtrType(MemPool::LLVM_CLASS_NAME));
+  DCHECK_EQ(pool_val->getType(), GetStructPtrType<MemPool>());
   // Extend 'size_val' to i64 if necessary
   if (size_val->getType()->getIntegerBitWidth() < 64) {
-    size_val = builder->CreateSExt(size_val, bigint_type());
+    size_val = builder->CreateSExt(size_val, i64_type());
   }
   llvm::Function* allocate_fn = GetFunction(IRFunction::MEMPOOL_ALLOCATE, false);
-  llvm::Value* alignment = GetIntConstant(TYPE_INT, MemPool::DEFAULT_ALIGNMENT);
+  llvm::Value* alignment = GetI32Constant(MemPool::DEFAULT_ALIGNMENT);
   llvm::Value* fn_args[] = {pool_val, size_val, alignment};
   return builder->CreateCall(allocate_fn, fn_args, name);
 }
@@ -1544,10 +1525,10 @@ llvm::Function* LlvmCodeGen::GetHashFunction(int num_bytes) {
     // Generate a function to hash these bytes
     stringstream ss;
     ss << "CrcHash" << num_bytes;
-    FnPrototype prototype(this, ss.str(), GetType(TYPE_INT));
+    FnPrototype prototype(this, ss.str(), i32_type());
     prototype.AddArgument(LlvmCodeGen::NamedVariable("data", ptr_type()));
-    prototype.AddArgument(LlvmCodeGen::NamedVariable("len", GetType(TYPE_INT)));
-    prototype.AddArgument(LlvmCodeGen::NamedVariable("seed", GetType(TYPE_INT)));
+    prototype.AddArgument(LlvmCodeGen::NamedVariable("len", i32_type()));
+    prototype.AddArgument(LlvmCodeGen::NamedVariable("seed", i32_type()));
 
     llvm::Value* args[3];
     LlvmBuilder builder(context());
@@ -1562,38 +1543,38 @@ llvm::Function* LlvmCodeGen::GetHashFunction(int num_bytes) {
 
     // Generate the crc instructions starting with the highest number of bytes
     if (num_bytes >= 8) {
-      llvm::Value* result_64 = builder.CreateZExt(result, GetType(TYPE_BIGINT));
-      llvm::Value* ptr = builder.CreateBitCast(data, GetPtrType(TYPE_BIGINT));
+      llvm::Value* result_64 = builder.CreateZExt(result, i64_type());
+      llvm::Value* ptr = builder.CreateBitCast(data, i64_ptr_type());
       int i = 0;
       while (num_bytes >= 8) {
-        llvm::Value* index[] = {GetIntConstant(TYPE_INT, i++)};
+        llvm::Value* index[] = {GetI32Constant(i++)};
         llvm::Value* d = builder.CreateLoad(builder.CreateInBoundsGEP(ptr, index));
         result_64 =
             builder.CreateCall(crc64_fn, llvm::ArrayRef<llvm::Value*>({result_64, d}));
         num_bytes -= 8;
       }
-      result = builder.CreateTrunc(result_64, GetType(TYPE_INT));
-      llvm::Value* index[] = {GetIntConstant(TYPE_INT, i * 8)};
+      result = builder.CreateTrunc(result_64, i32_type());
+      llvm::Value* index[] = {GetI32Constant(i * 8)};
       // Update data to past the 8-byte chunks
       data = builder.CreateInBoundsGEP(data, index);
     }
 
     if (num_bytes >= 4) {
       DCHECK_LT(num_bytes, 8);
-      llvm::Value* ptr = builder.CreateBitCast(data, GetPtrType(TYPE_INT));
+      llvm::Value* ptr = builder.CreateBitCast(data, i32_ptr_type());
       llvm::Value* d = builder.CreateLoad(ptr);
       result = builder.CreateCall(crc32_fn, llvm::ArrayRef<llvm::Value*>({result, d}));
-      llvm::Value* index[] = {GetIntConstant(TYPE_INT, 4)};
+      llvm::Value* index[] = {GetI32Constant(4)};
       data = builder.CreateInBoundsGEP(data, index);
       num_bytes -= 4;
     }
 
     if (num_bytes >= 2) {
       DCHECK_LT(num_bytes, 4);
-      llvm::Value* ptr = builder.CreateBitCast(data, GetPtrType(TYPE_SMALLINT));
+      llvm::Value* ptr = builder.CreateBitCast(data, i16_ptr_type());
       llvm::Value* d = builder.CreateLoad(ptr);
       result = builder.CreateCall(crc16_fn, llvm::ArrayRef<llvm::Value*>({result, d}));
-      llvm::Value* index[] = {GetIntConstant(TYPE_INT, 2)};
+      llvm::Value* index[] = {GetI16Constant(2)};
       data = builder.CreateInBoundsGEP(data, index);
       num_bytes -= 2;
     }
@@ -1606,7 +1587,7 @@ llvm::Function* LlvmCodeGen::GetHashFunction(int num_bytes) {
     }
     DCHECK_EQ(num_bytes, 0);
 
-    llvm::Value* shift_16 = GetIntConstant(TYPE_INT, 16);
+    llvm::Value* shift_16 = GetI32Constant(16);
     llvm::Value* upper_bits = builder.CreateShl(result, shift_16);
     llvm::Value* lower_bits = builder.CreateLShr(result, shift_16);
     result = builder.CreateOr(upper_bits, lower_bits);
@@ -1631,7 +1612,7 @@ static llvm::Function* GetLenOptimizedHashFn(
     // length with num_bytes.
     fn = codegen->CloneFunction(fn);
     llvm::Value* len_arg = codegen->GetArgument(fn, 1);
-    len_arg->replaceAllUsesWith(codegen->GetIntConstant(TYPE_INT, len));
+    len_arg->replaceAllUsesWith(codegen->GetI32Constant(len));
   }
   return codegen->FinalizeFunction(fn);
 }
@@ -1664,7 +1645,7 @@ llvm::Constant* LlvmCodeGen::ConstantToGVPtr(
   llvm::GlobalVariable* gv = new llvm::GlobalVariable(
       *module_, type, true, llvm::GlobalValue::PrivateLinkage, ir_constant, name);
   return llvm::ConstantExpr::getGetElementPtr(
-      NULL, gv, llvm::ArrayRef<llvm::Constant*>({GetIntConstant(TYPE_INT, 0)}));
+      NULL, gv, llvm::ArrayRef<llvm::Constant*>({GetI32Constant(0)}));
 }
 
 llvm::Constant* LlvmCodeGen::ConstantsToGVArrayPtr(llvm::Type* element_type,

http://git-wip-us.apache.org/repos/asf/impala/blob/632ee044/be/src/codegen/llvm-codegen.h
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen.h b/be/src/codegen/llvm-codegen.h
index 33fcdba..268ab6d 100644
--- a/be/src/codegen/llvm-codegen.h
+++ b/be/src/codegen/llvm-codegen.h
@@ -246,24 +246,40 @@ class LlvmCodeGen {
   llvm::PointerType* GetPtrPtrType(llvm::Type* type);
 
   /// Return a pointer to pointer type for 'name' type.
-  llvm::PointerType* GetPtrPtrType(const std::string& name);
+  llvm::PointerType* GetNamedPtrPtrType(const std::string& name);
 
   /// Returns llvm type for Impala's internal representation of this column type,
   /// i.e. the way Impala represents this type in a Tuple.
-  llvm::Type* GetType(const ColumnType& type);
+  llvm::Type* GetSlotType(const ColumnType& type);
 
   /// Return a pointer type to 'type' (e.g. int16_t*)
-  llvm::PointerType* GetPtrType(const ColumnType& type);
+  llvm::PointerType* GetSlotPtrType(const ColumnType& type);
 
   /// Returns the type with 'name'.  This is used to pull types from clang
   /// compiled IR.  The types we generate at runtime are unnamed.
   /// The name is generated by the clang compiler in this form:
   /// <class/struct>.<namespace>::<class name>.  For example:
   /// "class.impala::AggregationNode"
-  llvm::Type* GetType(const std::string& name);
+  llvm::Type* GetNamedType(const std::string& name);
 
-  /// Returns the pointer type of the type returned by GetType(name)
-  llvm::PointerType* GetPtrType(const std::string& name);
+  /// Returns the pointer type of the type returned by GetNamedType(name)
+  llvm::PointerType* GetNamedPtrType(const std::string& name);
+
+  /// Template versions of GetNamed*Type functions that expect the llvm name of
+  /// type T to be T::LLVM_CLASS_NAME. T must be a struct/class, so GetStructType
+  /// can return llvm::StructType* to avoid casting on the caller side.
+  template<class T>
+  llvm::StructType* GetStructType() {
+    return llvm::cast<llvm::StructType>(GetNamedType(T::LLVM_CLASS_NAME));
+  }
+
+  template<class T>
+  llvm::PointerType* GetStructPtrType() { return GetNamedPtrType(T::LLVM_CLASS_NAME); }
+
+  template<class T>
+  llvm::PointerType* GetStructPtrPtrType() {
+    return GetNamedPtrPtrType(T::LLVM_CLASS_NAME);
+  }
 
   /// Alloca's an instance of the appropriate pointer type and sets it to point at 'v'
   llvm::Value* GetPtrTo(LlvmBuilder* builder, llvm::Value* v, const char* name = "");
@@ -475,9 +491,6 @@ class LlvmCodeGen {
   /// c-code and code-generated IR.  The resulting value will be of 'type'.
   llvm::Value* CastPtrToLlvmPtr(llvm::Type* type, const void* ptr);
 
-  /// Returns the constant 'val' of 'type'.
-  llvm::Constant* GetIntConstant(PrimitiveType type, uint64_t val);
-
   /// Returns a constant int of 'byte_size' bytes based on 'low_bits' and 'high_bits'
   /// which stand for the lower and upper 64-bits of the constant respectively. For
   /// values less than or equal to 64-bits, 'high_bits' is not used. This function
@@ -488,21 +501,43 @@ class LlvmCodeGen {
   llvm::Value* GetStringConstant(LlvmBuilder* builder, char* data, int len);
 
   /// Returns true/false constants (bool type)
-  llvm::Value* true_value() { return true_value_; }
-  llvm::Value* false_value() { return false_value_; }
-  llvm::Value* null_ptr_value() { return llvm::ConstantPointerNull::get(ptr_type()); }
+  llvm::Constant* true_value() { return true_value_; }
+  llvm::Constant* false_value() { return false_value_; }
+  llvm::Constant* null_ptr_value() { return llvm::ConstantPointerNull::get(ptr_type()); }
 
   /// Simple wrappers to reduce code verbosity
-  llvm::Type* boolean_type() { return GetType(TYPE_BOOLEAN); }
-  llvm::Type* tinyint_type() { return GetType(TYPE_TINYINT); }
-  llvm::Type* smallint_type() { return GetType(TYPE_SMALLINT); }
-  llvm::Type* int_type() { return GetType(TYPE_INT); }
-  llvm::Type* bigint_type() { return GetType(TYPE_BIGINT); }
-  llvm::Type* float_type() { return GetType(TYPE_FLOAT); }
-  llvm::Type* double_type() { return GetType(TYPE_DOUBLE); }
+  llvm::Type* bool_type() { return llvm::Type::getInt1Ty(context()); }
+  llvm::Type* i8_type() { return llvm::Type::getInt8Ty(context()); }
+  llvm::Type* i16_type() { return llvm::Type::getInt16Ty(context()); }
+  llvm::Type* i32_type() { return llvm::Type::getInt32Ty(context()); }
+  llvm::Type* i64_type() { return llvm::Type::getInt64Ty(context()); }
+  llvm::Type* i128_type() { return llvm::Type::getIntNTy(context(), 128); }
+  llvm::Type* float_type() { return llvm::Type::getFloatTy(context()); }
+  llvm::Type* double_type() { return llvm::Type::getDoubleTy(context()); }
   llvm::PointerType* ptr_type() { return ptr_type_; }
   llvm::Type* void_type() { return void_type_; }
-  llvm::Type* i128_type() { return llvm::Type::getIntNTy(context(), 128); }
+
+  llvm::PointerType* i8_ptr_type() { return GetPtrType(i8_type()); }
+  llvm::PointerType* i16_ptr_type() { return GetPtrType(i16_type()); }
+  llvm::PointerType* i32_ptr_type() { return GetPtrType(i32_type()); }
+  llvm::PointerType* i64_ptr_type() { return GetPtrType(i64_type()); }
+  llvm::PointerType* float_ptr_type() { return GetPtrType(float_type()); }
+  llvm::PointerType* double_ptr_type() { return GetPtrType(double_type()); }
+  llvm::PointerType* ptr_ptr_type() { return GetPtrType(ptr_type_); }
+
+  llvm::Constant* GetBoolConstant(bool val) { return val ? true_value_ : false_value_; }
+  llvm::Constant* GetI8Constant(uint64_t val) {
+    return llvm::ConstantInt::get(context(), llvm::APInt(8, val));
+  }
+  llvm::Constant* GetI16Constant(uint64_t val) {
+    return llvm::ConstantInt::get(context(), llvm::APInt(16, val));
+  }
+  llvm::Constant* GetI32Constant(uint64_t val) {
+    return llvm::ConstantInt::get(context(), llvm::APInt(32, val));
+  }
+  llvm::Constant* GetI64Constant(uint64_t val) {
+    return llvm::ConstantInt::get(context(), llvm::APInt(64, val));
+  }
 
   /// Load the module temporarily and populate 'symbols' with the symbols in the module.
   static Status GetSymbols(const string& file, const string& module_id,
@@ -811,8 +846,8 @@ class LlvmCodeGen {
   llvm::Type* timestamp_value_type_;        // TimestampValue
 
   /// llvm constants to help with code gen verbosity
-  llvm::Value* true_value_;
-  llvm::Value* false_value_;
+  llvm::Constant* true_value_;
+  llvm::Constant* false_value_;
 
   /// The symbol emitted associated with 'execution_engine_'. Methods on
   /// 'symbol_emitter_' are called by 'execution_engine_' when code is emitted or freed.

http://git-wip-us.apache.org/repos/asf/impala/blob/632ee044/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index eaf8dd1..5f9f777 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -566,14 +566,14 @@ Status ExecNode::CodegenEvalConjuncts(LlvmCodeGen* codegen,
 
   // Construct function signature to match
   // bool EvalConjuncts(ScalarExprEvaluator**, int, TupleRow*)
-  llvm::PointerType* tuple_row_ptr_type = codegen->GetPtrType(TupleRow::LLVM_CLASS_NAME);
-  llvm::Type* eval_type = codegen->GetType(ScalarExprEvaluator::LLVM_CLASS_NAME);
+  llvm::PointerType* tuple_row_ptr_type = codegen->GetStructPtrType<TupleRow>();
+  llvm::Type* eval_type = codegen->GetStructType<ScalarExprEvaluator>();
 
-  LlvmCodeGen::FnPrototype prototype(codegen, name, codegen->GetType(TYPE_BOOLEAN));
+  LlvmCodeGen::FnPrototype prototype(codegen, name, codegen->bool_type());
   prototype.AddArgument(
       LlvmCodeGen::NamedVariable("evals", codegen->GetPtrPtrType(eval_type)));
   prototype.AddArgument(
-      LlvmCodeGen::NamedVariable("num_evals", codegen->GetType(TYPE_INT)));
+      LlvmCodeGen::NamedVariable("num_evals", codegen->i32_type()));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
 
   LlvmBuilder builder(codegen->context());
@@ -590,7 +590,7 @@ Status ExecNode::CodegenEvalConjuncts(LlvmCodeGen* codegen,
       llvm::BasicBlock* true_block =
           llvm::BasicBlock::Create(context, "continue", *fn, false_block);
       llvm::Value* eval_arg_ptr = builder.CreateInBoundsGEP(
-          NULL, evals_arg, codegen->GetIntConstant(TYPE_INT, i), "eval_ptr");
+          NULL, evals_arg, codegen->GetI32Constant(i), "eval_ptr");
       llvm::Value* eval_arg = builder.CreateLoad(eval_arg_ptr, "eval");
 
       // Call conjunct_fns[i]

http://git-wip-us.apache.org/repos/asf/impala/blob/632ee044/be/src/exec/filter-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/filter-context.cc b/be/src/exec/filter-context.cc
index 70618df..5c39ff9 100644
--- a/be/src/exec/filter-context.cc
+++ b/be/src/exec/filter-context.cc
@@ -145,10 +145,10 @@ Status FilterContext::CodegenEval(
   LlvmBuilder builder(context);
 
   *fn = nullptr;
-  llvm::PointerType* this_type = codegen->GetPtrType(FilterContext::LLVM_CLASS_NAME);
-  llvm::PointerType* tuple_row_ptr_type = codegen->GetPtrType(TupleRow::LLVM_CLASS_NAME);
+  llvm::PointerType* this_type = codegen->GetStructPtrType<FilterContext>();
+  llvm::PointerType* tuple_row_ptr_type = codegen->GetStructPtrType<TupleRow>();
   LlvmCodeGen::FnPrototype prototype(codegen, "FilterContextEval",
-      codegen->boolean_type());
+      codegen->bool_type());
   prototype.AddArgument(LlvmCodeGen::NamedVariable("this", this_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
 
@@ -206,7 +206,7 @@ Status FilterContext::CodegenEval(
 
   // Create a global constant of the filter expression's ColumnType. It needs to be a
   // constant for constant propagation and dead code elimination in 'runtime_filter_fn'.
-  llvm::Type* col_type = codegen->GetType(ColumnType::LLVM_CLASS_NAME);
+  llvm::Type* col_type = codegen->GetStructType<ColumnType>();
   llvm::Constant* expr_type_arg = codegen->ConstantToGVPtr(
       col_type, filter_expr->type().ToIR(codegen), "expr_type_arg");
 
@@ -281,8 +281,8 @@ Status FilterContext::CodegenInsert(LlvmCodeGen* codegen, ScalarExpr* filter_exp
   LlvmBuilder builder(context);
 
   *fn = nullptr;
-  llvm::PointerType* this_type = codegen->GetPtrType(FilterContext::LLVM_CLASS_NAME);
-  llvm::PointerType* tuple_row_ptr_type = codegen->GetPtrType(TupleRow::LLVM_CLASS_NAME);
+  llvm::PointerType* this_type = codegen->GetStructPtrType<FilterContext>();
+  llvm::PointerType* tuple_row_ptr_type = codegen->GetStructPtrType<TupleRow>();
   LlvmCodeGen::FnPrototype prototype(
       codegen, "FilterContextInsert", codegen->void_type());
   prototype.AddArgument(LlvmCodeGen::NamedVariable("this", this_type));
@@ -306,8 +306,8 @@ Status FilterContext::CodegenInsert(LlvmCodeGen* codegen, ScalarExpr* filter_exp
     llvm::Value* local_min_max_filter_ptr =
         builder.CreateStructGEP(nullptr, this_arg, 4, "local_min_max_filter_ptr");
     llvm::PointerType* min_max_filter_type =
-        codegen->GetPtrType(MinMaxFilter::GetLlvmClassName(filter_expr->type().type))
-            ->getPointerTo();
+        codegen->GetNamedPtrType(MinMaxFilter::GetLlvmClassName(
+        filter_expr->type().type))->getPointerTo();
     local_min_max_filter_ptr = builder.CreatePointerCast(
         local_min_max_filter_ptr, min_max_filter_type, "cast_min_max_filter_ptr");
     local_filter_arg =
@@ -372,13 +372,13 @@ Status FilterContext::CodegenInsert(LlvmCodeGen* codegen, ScalarExpr* filter_exp
   if (ctx->filter->is_bloom_filter()) {
     // Create a global constant of the filter expression's ColumnType. It needs to be a
     // constant for constant propagation and dead code elimination in 'get_hash_value_fn'.
-    llvm::Type* col_type = codegen->GetType(ColumnType::LLVM_CLASS_NAME);
+    llvm::Type* col_type = codegen->GetStructType<ColumnType>();
     llvm::Constant* expr_type_arg = codegen->ConstantToGVPtr(
         col_type, filter_expr->type().ToIR(codegen), "expr_type_arg");
 
     // Call RawValue::GetHashValue() on the result of the filter's expression.
     llvm::Value* seed_arg =
-        codegen->GetIntConstant(TYPE_INT, RuntimeFilterBank::DefaultHashSeed());
+        codegen->GetI32Constant(RuntimeFilterBank::DefaultHashSeed());
     llvm::Value* get_hash_value_args[] = {val_ptr_phi, expr_type_arg, seed_arg};
     llvm::Function* get_hash_value_fn =
         codegen->GetFunction(IRFunction::RAW_VALUE_GET_HASH_VALUE, false);

http://git-wip-us.apache.org/repos/asf/impala/blob/632ee044/be/src/exec/hash-table.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc
index da9f195..38e0d26 100644
--- a/be/src/exec/hash-table.cc
+++ b/be/src/exec/hash-table.cc
@@ -585,7 +585,7 @@ static void CodegenAssignNullValue(LlvmCodeGen* codegen, LlvmBuilder* builder,
   if (type.type == TYPE_STRING || type.type == TYPE_VARCHAR) {
     llvm::Value* dst_ptr = builder->CreateStructGEP(NULL, dst, 0, "string_ptr");
     llvm::Value* dst_len = builder->CreateStructGEP(NULL, dst, 1, "string_len");
-    llvm::Value* null_len = codegen->GetIntConstant(TYPE_INT, fnv_seed);
+    llvm::Value* null_len = codegen->GetI32Constant(fnv_seed);
     llvm::Value* null_ptr = builder->CreateIntToPtr(null_len, codegen->ptr_type());
     builder->CreateStore(null_ptr, dst_ptr);
     builder->CreateStore(null_len, dst_len);
@@ -597,7 +597,7 @@ static void CodegenAssignNullValue(LlvmCodeGen* codegen, LlvmBuilder* builder,
       case TYPE_BOOLEAN:
         // In results, booleans are stored as 1 byte
         dst = builder->CreateBitCast(dst, codegen->ptr_type());
-        null_value = codegen->GetIntConstant(TYPE_TINYINT, fnv_seed);
+        null_value = codegen->GetI8Constant(fnv_seed);
         break;
       case TYPE_TIMESTAMP: {
         // Cast 'dst' to 'i128*'
@@ -718,14 +718,10 @@ Status HashTableCtx::CodegenEvalRow(
   }
 
   // Get types to generate function prototype
-  llvm::Type* this_type = codegen->GetType(HashTableCtx::LLVM_CLASS_NAME);
-  DCHECK(this_type != NULL);
-  llvm::PointerType* this_ptr_type = codegen->GetPtrType(this_type);
-  llvm::Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME);
-  DCHECK(tuple_row_type != NULL);
-  llvm::PointerType* tuple_row_ptr_type = codegen->GetPtrType(tuple_row_type);
+  llvm::PointerType* this_ptr_type = codegen->GetStructPtrType<HashTableCtx>();
+  llvm::PointerType* tuple_row_ptr_type = codegen->GetStructPtrType<TupleRow>();
   LlvmCodeGen::FnPrototype prototype(codegen, build ? "EvalBuildRow" : "EvalProbeRow",
-      codegen->GetType(TYPE_BOOLEAN));
+      codegen->bool_type());
   prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", this_ptr_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("expr_values", codegen->ptr_type()));
@@ -755,9 +751,9 @@ Status HashTableCtx::CodegenEvalRow(
     // Convert result buffer to llvm ptr type
     int offset = expr_values_cache_.expr_values_offsets(i);
     llvm::Value* loc = builder.CreateInBoundsGEP(
-        NULL, expr_values, codegen->GetIntConstant(TYPE_INT, offset), "loc_addr");
-    llvm::Value* llvm_loc =
-        builder.CreatePointerCast(loc, codegen->GetPtrType(exprs[i]->type()), "loc");
+        NULL, expr_values, codegen->GetI32Constant(offset), "loc_addr");
+    llvm::Value* llvm_loc = builder.CreatePointerCast(loc,
+        codegen->GetSlotPtrType(exprs[i]->type()), "loc");
 
     llvm::BasicBlock* null_block = llvm::BasicBlock::Create(context, "null", *fn);
     llvm::BasicBlock* not_null_block = llvm::BasicBlock::Create(context, "not_null", *fn);
@@ -784,9 +780,9 @@ Status HashTableCtx::CodegenEvalRow(
     llvm::Value* is_null = result.GetIsNull();
 
     // Set null-byte result
-    llvm::Value* null_byte = builder.CreateZExt(is_null, codegen->GetType(TYPE_TINYINT));
+    llvm::Value* null_byte = builder.CreateZExt(is_null, codegen->i8_type());
     llvm::Value* llvm_null_byte_loc = builder.CreateInBoundsGEP(
-        NULL, expr_values_null, codegen->GetIntConstant(TYPE_INT, i), "null_byte_loc");
+        NULL, expr_values_null, codegen->GetI32Constant(i), "null_byte_loc");
     builder.CreateStore(null_byte, llvm_null_byte_loc);
     builder.CreateCondBr(is_null, null_block, not_null_block);
 
@@ -810,7 +806,7 @@ Status HashTableCtx::CodegenEvalRow(
     if (stores_nulls_) {
       // Update has_null
       llvm::PHINode* is_null_phi =
-          builder.CreatePHI(codegen->boolean_type(), 2, "is_null_phi");
+          builder.CreatePHI(codegen->bool_type(), 2, "is_null_phi");
       is_null_phi->addIncoming(codegen->true_value(), null_block);
       is_null_phi->addIncoming(codegen->false_value(), not_null_block);
       has_null = builder.CreateOr(has_null, is_null_phi, "has_null");
@@ -875,12 +871,10 @@ Status HashTableCtx::CodegenHashRow(
   }
 
   // Get types to generate function prototype
-  llvm::Type* this_type = codegen->GetType(HashTableCtx::LLVM_CLASS_NAME);
-  DCHECK(this_type != NULL);
-  llvm::PointerType* this_ptr_type = codegen->GetPtrType(this_type);
+  llvm::PointerType* this_ptr_type = codegen->GetStructPtrType<HashTableCtx>();
 
   LlvmCodeGen::FnPrototype prototype(
-      codegen, (use_murmur ? "MurmurHashRow" : "HashRow"), codegen->GetType(TYPE_INT));
+      codegen, (use_murmur ? "MurmurHashRow" : "HashRow"), codegen->i32_type());
   prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", this_ptr_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("expr_values", codegen->ptr_type()));
   prototype.AddArgument(
@@ -907,7 +901,7 @@ Status HashTableCtx::CodegenHashRow(
       llvm::Function* hash_fn = use_murmur ?
           codegen->GetMurmurHashFunction(expr_values_bytes_per_row) :
           codegen->GetHashFunction(expr_values_bytes_per_row);
-      llvm::Value* len = codegen->GetIntConstant(TYPE_INT, expr_values_bytes_per_row);
+      llvm::Value* len = codegen->GetI32Constant(expr_values_bytes_per_row);
       hash_result = builder.CreateCall(
           hash_fn, llvm::ArrayRef<llvm::Value*>({expr_values, len, hash_result}), "hash");
     }
@@ -916,7 +910,7 @@ Status HashTableCtx::CodegenHashRow(
       llvm::Function* hash_fn = use_murmur ?
           codegen->GetMurmurHashFunction(var_result_offset) :
           codegen->GetHashFunction(var_result_offset);
-      llvm::Value* len = codegen->GetIntConstant(TYPE_INT, var_result_offset);
+      llvm::Value* len = codegen->GetI32Constant(var_result_offset);
       hash_result = builder.CreateCall(
           hash_fn, llvm::ArrayRef<llvm::Value*>({expr_values, len, hash_result}), "hash");
     }
@@ -935,7 +929,7 @@ Status HashTableCtx::CodegenHashRow(
 
       int offset = expr_values_cache_.expr_values_offsets(i);
       llvm::Value* llvm_loc = builder.CreateInBoundsGEP(
-          NULL, expr_values, codegen->GetIntConstant(TYPE_INT, offset), "loc_addr");
+          NULL, expr_values, codegen->GetI32Constant(offset), "loc_addr");
 
       // If the hash table stores nulls, we need to check if the stringval
       // evaluated to NULL
@@ -945,10 +939,10 @@ Status HashTableCtx::CodegenHashRow(
         continue_block = llvm::BasicBlock::Create(context, "continue", *fn);
 
         llvm::Value* llvm_null_byte_loc = builder.CreateInBoundsGEP(NULL,
-            expr_values_null, codegen->GetIntConstant(TYPE_INT, i), "null_byte_loc");
+            expr_values_null, codegen->GetI32Constant(i), "null_byte_loc");
         llvm::Value* null_byte = builder.CreateLoad(llvm_null_byte_loc, "null_byte");
         llvm::Value* is_null = builder.CreateICmpNE(
-            null_byte, codegen->GetIntConstant(TYPE_TINYINT, 0), "is_null");
+            null_byte, codegen->GetI8Constant(0), "is_null");
         builder.CreateCondBr(is_null, null_block, not_null_block);
 
         // For null, we just want to call the hash function on the portion of
@@ -957,7 +951,7 @@ Status HashTableCtx::CodegenHashRow(
         llvm::Function* null_hash_fn = use_murmur ?
             codegen->GetMurmurHashFunction(sizeof(StringValue)) :
             codegen->GetHashFunction(sizeof(StringValue));
-        llvm::Value* len = codegen->GetIntConstant(TYPE_INT, sizeof(StringValue));
+        llvm::Value* len = codegen->GetI32Constant(sizeof(StringValue));
         str_null_result = builder.CreateCall(null_hash_fn,
             llvm::ArrayRef<llvm::Value*>({llvm_loc, len, hash_result}), "str_null");
         builder.CreateBr(continue_block);
@@ -967,7 +961,7 @@ Status HashTableCtx::CodegenHashRow(
 
       // Convert expr_values_buffer_ loc to llvm value
       llvm::Value* str_val = builder.CreatePointerCast(
-          llvm_loc, codegen->GetPtrType(TYPE_STRING), "str_val");
+          llvm_loc, codegen->GetSlotPtrType(TYPE_STRING), "str_val");
 
       llvm::Value* ptr = builder.CreateStructGEP(NULL, str_val, 0);
       llvm::Value* len = builder.CreateStructGEP(NULL, str_val, 1);
@@ -986,7 +980,7 @@ Status HashTableCtx::CodegenHashRow(
         // Use phi node to reconcile that we could have come from the string-null
         // path and string not null paths.
         llvm::PHINode* phi_node =
-            builder.CreatePHI(codegen->GetType(TYPE_INT), 2, "hash_phi");
+            builder.CreatePHI(codegen->i32_type(), 2, "hash_phi");
         phi_node->addIncoming(string_hash_result, not_null_block);
         phi_node->addIncoming(str_null_result, null_block);
         hash_result = phi_node;
@@ -1086,14 +1080,10 @@ Status HashTableCtx::CodegenEquals(
   }
 
   // Get types to generate function prototype
-  llvm::Type* this_type = codegen->GetType(HashTableCtx::LLVM_CLASS_NAME);
-  DCHECK(this_type != NULL);
-  llvm::PointerType* this_ptr_type = codegen->GetPtrType(this_type);
-  llvm::Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME);
-  DCHECK(tuple_row_type != NULL);
-  llvm::PointerType* tuple_row_ptr_type = codegen->GetPtrType(tuple_row_type);
-
-  LlvmCodeGen::FnPrototype prototype(codegen, "Equals", codegen->GetType(TYPE_BOOLEAN));
+  llvm::PointerType* this_ptr_type = codegen->GetStructPtrType<HashTableCtx>();
+  llvm::PointerType* tuple_row_ptr_type = codegen->GetStructPtrType<TupleRow>();
+
+  LlvmCodeGen::FnPrototype prototype(codegen, "Equals", codegen->bool_type());
   prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", this_ptr_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("expr_values", codegen->ptr_type()));
@@ -1148,18 +1138,18 @@ Status HashTableCtx::CodegenEquals(
     // predicate is <=>
     if (force_null_equality || finds_nulls_[i]) {
       llvm::Value* llvm_null_byte_loc = builder.CreateInBoundsGEP(
-          NULL, expr_values_null, codegen->GetIntConstant(TYPE_INT, i), "null_byte_loc");
+          NULL, expr_values_null, codegen->GetI32Constant(i), "null_byte_loc");
       llvm::Value* null_byte = builder.CreateLoad(llvm_null_byte_loc);
       row_is_null =
-          builder.CreateICmpNE(null_byte, codegen->GetIntConstant(TYPE_TINYINT, 0));
+          builder.CreateICmpNE(null_byte, codegen->GetI8Constant(0));
     }
 
     // Get llvm value for row_val from 'expr_values'
     int offset = expr_values_cache_.expr_values_offsets(i);
     llvm::Value* loc = builder.CreateInBoundsGEP(
-        NULL, expr_values, codegen->GetIntConstant(TYPE_INT, offset), "loc");
+        NULL, expr_values, codegen->GetI32Constant(offset), "loc");
     llvm::Value* row_val = builder.CreatePointerCast(
-        loc, codegen->GetPtrType(build_exprs_[i]->type()), "row_val");
+        loc, codegen->GetSlotPtrType(build_exprs_[i]->type()), "row_val");
 
     // Branch for GetValue() returning NULL
     builder.CreateCondBr(is_null, null_block, not_null_block);

http://git-wip-us.apache.org/repos/asf/impala/blob/632ee044/be/src/exec/hdfs-avro-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc
index 3ec1f09..fe1bed4 100644
--- a/be/src/exec/hdfs-avro-scanner.cc
+++ b/be/src/exec/hdfs-avro-scanner.cc
@@ -759,23 +759,18 @@ Status HdfsAvroScanner::CodegenMaterializeTuple(const HdfsScanNodeBase* node,
   llvm::LLVMContext& context = codegen->context();
   LlvmBuilder builder(context);
 
-  llvm::Type* this_type = codegen->GetType(HdfsAvroScanner::LLVM_CLASS_NAME);
-  DCHECK(this_type != nullptr);
-  llvm::PointerType* this_ptr_type = llvm::PointerType::get(this_type, 0);
+  llvm::PointerType* this_ptr_type = codegen->GetStructPtrType<HdfsAvroScanner>();
 
   TupleDescriptor* tuple_desc = const_cast<TupleDescriptor*>(node->tuple_desc());
   llvm::StructType* tuple_type = tuple_desc->GetLlvmStruct(codegen);
   if (tuple_type == nullptr) return Status("Could not generate tuple struct.");
   llvm::Type* tuple_ptr_type = llvm::PointerType::get(tuple_type, 0);
 
-  llvm::Type* tuple_opaque_type = codegen->GetType(Tuple::LLVM_CLASS_NAME);
-  llvm::PointerType* tuple_opaque_ptr_type = llvm::PointerType::get(tuple_opaque_type, 0);
+  llvm::PointerType* tuple_opaque_ptr_type = codegen->GetStructPtrType<Tuple>();
 
-  llvm::Type* data_ptr_type = llvm::PointerType::get(codegen->ptr_type(), 0); // char**
-  llvm::Type* mempool_type =
-      llvm::PointerType::get(codegen->GetType(MemPool::LLVM_CLASS_NAME), 0);
-  llvm::Type* schema_element_type =
-      codegen->GetPtrType(AvroSchemaElement::LLVM_CLASS_NAME);
+  llvm::Type* data_ptr_type = codegen->ptr_ptr_type(); // char**
+  llvm::Type* mempool_type = codegen->GetStructPtrType<MemPool>();
+  llvm::Type* schema_element_type = codegen->GetStructPtrType<AvroSchemaElement>();
 
   // Schema can be null if metadata is stale. See test in
   // queries/QueryTest/avro-schema-changes.test.
@@ -792,7 +787,7 @@ Status HdfsAvroScanner::CodegenMaterializeTuple(const HdfsScanNodeBase* node,
   std::vector<llvm::Function*> helper_functions;
 
   // prototype re-used several times by amending with SetName()
-  LlvmCodeGen::FnPrototype prototype(codegen, "", codegen->boolean_type());
+  LlvmCodeGen::FnPrototype prototype(codegen, "", codegen->bool_type());
   prototype.AddArgument(LlvmCodeGen::NamedVariable("this", this_ptr_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("record_schema", schema_element_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("pool", mempool_type));
@@ -933,9 +928,9 @@ Status HdfsAvroScanner::CodegenReadRecord(const SchemaPath& path,
       llvm::Function* read_union_fn =
           codegen->GetFunction(IRFunction::READ_UNION_TYPE, false);
       llvm::Value* null_union_pos_val =
-          codegen->GetIntConstant(TYPE_INT, field->null_union_position);
+          codegen->GetI32Constant(field->null_union_position);
       if (is_null_ptr == nullptr) {
-        is_null_ptr = codegen->CreateEntryBlockAlloca(*builder, codegen->boolean_type(),
+        is_null_ptr = codegen->CreateEntryBlockAlloca(*builder, codegen->bool_type(),
             "is_null_ptr");
       }
       llvm::Value* is_null_ptr_cast =
@@ -1097,7 +1092,7 @@ Status HdfsAvroScanner::CodegenDecodeAvroData(const HdfsScanNodeBase* node,
 
   int tuple_byte_size = node->tuple_desc()->byte_size();
   replaced = codegen->ReplaceCallSitesWithValue(fn,
-      codegen->GetIntConstant(TYPE_INT, tuple_byte_size), "tuple_byte_size");
+      codegen->GetI32Constant(tuple_byte_size), "tuple_byte_size");
   DCHECK_EQ(replaced, 1);
 
   fn->setName("DecodeAvroData");

http://git-wip-us.apache.org/repos/asf/impala/blob/632ee044/be/src/exec/hdfs-avro-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner.h b/be/src/exec/hdfs-avro-scanner.h
index e2260a6..f7b1c31 100644
--- a/be/src/exec/hdfs-avro-scanner.h
+++ b/be/src/exec/hdfs-avro-scanner.h
@@ -100,6 +100,8 @@ class HdfsAvroScanner : public BaseSequenceScanner {
       llvm::Function** decode_avro_data_fn)
       WARN_UNUSED_RESULT;
 
+  static const char* LLVM_CLASS_NAME;
+
  protected:
   /// Implementation of BaseSeqeunceScanner super class methods
   virtual FileHeader* AllocateFileHeader();
@@ -309,8 +311,6 @@ class HdfsAvroScanner : public BaseSequenceScanner {
 
   /// Unit test constructor
   HdfsAvroScanner();
-
-  static const char* LLVM_CLASS_NAME;
 };
 } // namespace impala
 

http://git-wip-us.apache.org/repos/asf/impala/blob/632ee044/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 7a10f3c..e279369 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -1183,10 +1183,10 @@ Status HdfsParquetScanner::CodegenEvalRuntimeFilters(
   LlvmBuilder builder(context);
 
   *fn = nullptr;
-  llvm::Type* this_type = codegen->GetPtrType(HdfsParquetScanner::LLVM_CLASS_NAME);
-  llvm::PointerType* tuple_row_ptr_type = codegen->GetPtrType(TupleRow::LLVM_CLASS_NAME);
+  llvm::Type* this_type = codegen->GetStructPtrType<HdfsParquetScanner>();
+  llvm::PointerType* tuple_row_ptr_type = codegen->GetStructPtrType<TupleRow>();
   LlvmCodeGen::FnPrototype prototype(codegen, "EvalRuntimeFilters",
-      codegen->GetType(TYPE_BOOLEAN));
+      codegen->bool_type());
   prototype.AddArgument(LlvmCodeGen::NamedVariable("this", this_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
 
@@ -1221,7 +1221,7 @@ Status HdfsParquetScanner::CodegenEvalRuntimeFilters(
           "FilterContext4Eval");
       DCHECK_EQ(replaced, 1);
 
-      llvm::Value* idx = codegen->GetIntConstant(TYPE_INT, i);
+      llvm::Value* idx = codegen->GetI32Constant(i);
       llvm::Value* passed_filter = builder.CreateCall(
           eval_runtime_filter_fn, llvm::ArrayRef<llvm::Value*>({this_arg, idx, row_arg}));
 

http://git-wip-us.apache.org/repos/asf/impala/blob/632ee044/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index b1409d7..f0043b5 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -351,6 +351,9 @@ class HdfsParquetScanner : public HdfsScanner {
   /// Indicates an invalid position value.
   static const int16_t INVALID_POS = -1;
 
+  /// Class name in LLVM IR.
+  static const char* LLVM_CLASS_NAME;
+
  private:
   friend class ParquetColumnReader;
   friend class CollectionColumnReader;
@@ -367,9 +370,6 @@ class HdfsParquetScanner : public HdfsScanner {
       "You can increase FOOTER_SIZE if you want, "
       "just don't forget to increase READ_SIZE_MIN_VALUE as well.");
 
-  /// Class name in LLVM IR.
-  static const char* LLVM_CLASS_NAME;
-
   /// Index of the current row group being processed. Initialized to -1 which indicates
   /// that we have not started processing the first row group yet (GetNext() has not yet
   /// been called).

http://git-wip-us.apache.org/repos/asf/impala/blob/632ee044/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index d62ccc7..1809fe5 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -340,26 +340,13 @@ Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanNodeBase* node,
   node->ComputeSlotMaterializationOrder(&materialize_order);
 
   // Get types to construct matching function signature to WriteCompleteTuple
-  llvm::PointerType* uint8_ptr_type =
-      llvm::PointerType::get(codegen->GetType(TYPE_TINYINT), 0);
-
-  llvm::StructType* field_loc_type = reinterpret_cast<llvm::StructType*>(
-      codegen->GetType(FieldLocation::LLVM_CLASS_NAME));
-  llvm::Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME);
-  llvm::Type* tuple_opaque_type = codegen->GetType(Tuple::LLVM_CLASS_NAME);
-  llvm::Type* mem_pool_type = codegen->GetType(MemPool::LLVM_CLASS_NAME);
-  llvm::Type* hdfs_scanner_type = codegen->GetType(HdfsScanner::LLVM_CLASS_NAME);
-
-  DCHECK(tuple_opaque_type != NULL);
-  DCHECK(tuple_row_type != NULL);
-  DCHECK(field_loc_type != NULL);
-  DCHECK(hdfs_scanner_type != NULL);
-
-  llvm::PointerType* field_loc_ptr_type = llvm::PointerType::get(field_loc_type, 0);
-  llvm::PointerType* tuple_opaque_ptr_type = llvm::PointerType::get(tuple_opaque_type, 0);
-  llvm::PointerType* tuple_row_ptr_type = llvm::PointerType::get(tuple_row_type, 0);
-  llvm::PointerType* mem_pool_ptr_type = llvm::PointerType::get(mem_pool_type, 0);
-  llvm::PointerType* hdfs_scanner_ptr_type = llvm::PointerType::get(hdfs_scanner_type, 0);
+  llvm::PointerType* uint8_ptr_type = codegen->i8_ptr_type();
+
+  llvm::PointerType* field_loc_ptr_type = codegen->GetStructPtrType<FieldLocation>();
+  llvm::PointerType* tuple_opaque_ptr_type = codegen->GetStructPtrType<Tuple>();
+  llvm::PointerType* tuple_row_ptr_type = codegen->GetStructPtrType<TupleRow>();
+  llvm::PointerType* mem_pool_ptr_type = codegen->GetStructPtrType<MemPool>();
+  llvm::PointerType* hdfs_scanner_ptr_type = codegen->GetStructPtrType<HdfsScanner>();
 
   // Generate the typed llvm struct for the output tuple
   llvm::StructType* tuple_type = tuple_desc->GetLlvmStruct(codegen);
@@ -369,7 +356,7 @@ Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanNodeBase* node,
   // Initialize the function prototype.  This needs to match
   // HdfsScanner::WriteCompleteTuple's signature identically.
   LlvmCodeGen::FnPrototype prototype(
-      codegen, "WriteCompleteTuple", codegen->GetType(TYPE_BOOLEAN));
+      codegen, "WriteCompleteTuple", codegen->bool_type());
   prototype.AddArgument(LlvmCodeGen::NamedVariable("this", hdfs_scanner_ptr_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("pool", mem_pool_ptr_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("fields", field_loc_ptr_type));
@@ -408,7 +395,7 @@ Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanNodeBase* node,
   // Put tuple in tuple_row
   llvm::Value* tuple_row_typed =
       builder.CreateBitCast(tuple_row_arg, llvm::PointerType::get(tuple_ptr_type, 0));
-  llvm::Value* tuple_row_idxs[] = {codegen->GetIntConstant(TYPE_INT, node->tuple_idx())};
+  llvm::Value* tuple_row_idxs[] = {codegen->GetI32Constant(node->tuple_idx())};
   llvm::Value* tuple_in_row_addr =
       builder.CreateInBoundsGEP(tuple_row_typed, tuple_row_idxs);
   builder.CreateStore(tuple_arg, tuple_in_row_addr);
@@ -437,15 +424,15 @@ Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanNodeBase* node,
 
       // Extract ptr/len from fields
       llvm::Value* data_idxs[] = {
-          codegen->GetIntConstant(TYPE_INT, slot_idx),
-          codegen->GetIntConstant(TYPE_INT, 0),
+          codegen->GetI32Constant(slot_idx),
+          codegen->GetI32Constant(0),
       };
       llvm::Value* len_idxs[] = {
-          codegen->GetIntConstant(TYPE_INT, slot_idx),
-          codegen->GetIntConstant(TYPE_INT, 1),
+          codegen->GetI32Constant(slot_idx),
+          codegen->GetI32Constant(1),
       };
       llvm::Value* error_idxs[] = {
-          codegen->GetIntConstant(TYPE_INT, slot_idx),
+          codegen->GetI32Constant(slot_idx),
       };
       llvm::Value* data_ptr =
           builder.CreateInBoundsGEP(fields_arg, data_idxs, "data_ptr");
@@ -462,10 +449,10 @@ Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanNodeBase* node,
       // send a 'need_escape' bool to CodegenWriteSlot(), since we are making the length
       // positive here.
       llvm::Value* len_lt_zero =
-          builder.CreateICmpSLT(len, codegen->GetIntConstant(TYPE_INT, 0), "len_lt_zero");
+          builder.CreateICmpSLT(len, codegen->GetI32Constant(0), "len_lt_zero");
       llvm::Value* ones_compliment_len = builder.CreateNot(len, "ones_compliment_len");
       llvm::Value* positive_len = builder.CreateAdd(
-          ones_compliment_len, codegen->GetIntConstant(TYPE_INT, 1), "positive_len");
+          ones_compliment_len, codegen->GetI32Constant(1), "positive_len");
       len = builder.CreateSelect(len_lt_zero, positive_len, len,
           "select_positive_len");
 
@@ -475,7 +462,7 @@ Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanNodeBase* node,
           slot_fn, llvm::ArrayRef<llvm::Value*>({tuple_arg, data, len}));
       llvm::Value* slot_error = builder.CreateNot(slot_parsed, "slot_parse_error");
       error_in_row = builder.CreateOr(error_in_row, slot_error, "error_in_row");
-      slot_error = builder.CreateZExt(slot_error, codegen->GetType(TYPE_TINYINT));
+      slot_error = builder.CreateZExt(slot_error, codegen->i8_type());
       builder.CreateStore(slot_error, error_ptr);
     }
 
@@ -484,7 +471,7 @@ Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanNodeBase* node,
       // This slots are the last to get materialized.  If we are in this branch, the
       // tuple passed all conjuncts and should be added to the row batch.
       llvm::Value* error_ret =
-          builder.CreateZExt(error_in_row, codegen->GetType(TYPE_TINYINT));
+          builder.CreateZExt(error_in_row, codegen->i8_type());
       builder.CreateStore(error_ret, error_in_row_arg);
       builder.CreateRet(codegen->true_value());
     } else {
@@ -510,7 +497,7 @@ Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanNodeBase* node,
           codegen->GetFunction(IRFunction::HDFS_SCANNER_GET_CONJUNCT_EVALUATOR, false);
       llvm::Value* eval = builder.CreateCall(
           get_eval_fn, llvm::ArrayRef<llvm::Value*>(
-                           {this_arg, codegen->GetIntConstant(TYPE_INT, conjunct_idx)}));
+                           {this_arg, codegen->GetI32Constant(conjunct_idx)}));
 
       llvm::Value* conjunct_args[] = {eval, tuple_row_arg};
       CodegenAnyVal result = CodegenAnyVal::CreateCallWrapped(
@@ -559,7 +546,7 @@ Status HdfsScanner::CodegenWriteAlignedTuples(const HdfsScanNodeBase* node,
 
   int tuple_byte_size = node->tuple_desc()->byte_size();
   replaced = codegen->ReplaceCallSitesWithValue(write_tuples_fn,
-      codegen->GetIntConstant(TYPE_INT, tuple_byte_size), "tuple_byte_size");
+      codegen->GetI32Constant(tuple_byte_size), "tuple_byte_size");
   DCHECK_EQ(replaced, 1);
 
   *write_aligned_tuples_fn = codegen->FinalizeFunction(write_tuples_fn);
@@ -581,16 +568,16 @@ Status HdfsScanner::CodegenInitTuple(
 
   const TupleDescriptor* tuple_desc = node->tuple_desc();
   replaced = codegen->ReplaceCallSitesWithValue(*init_tuple_fn,
-      codegen->GetIntConstant(TYPE_INT, tuple_desc->byte_size()), "tuple_byte_size");
+      codegen->GetI32Constant(tuple_desc->byte_size()), "tuple_byte_size");
   DCHECK_EQ(replaced, 1);
 
   replaced = codegen->ReplaceCallSitesWithValue(*init_tuple_fn,
-      codegen->GetIntConstant(TYPE_INT, tuple_desc->null_bytes_offset()),
+      codegen->GetI32Constant(tuple_desc->null_bytes_offset()),
       "null_bytes_offset");
   DCHECK_EQ(replaced, 1);
 
   replaced = codegen->ReplaceCallSitesWithValue(*init_tuple_fn,
-      codegen->GetIntConstant(TYPE_INT, tuple_desc->num_null_bytes()), "num_null_bytes");
+      codegen->GetI32Constant(tuple_desc->num_null_bytes()), "num_null_bytes");
   DCHECK_EQ(replaced, 1);
 
   *init_tuple_fn = codegen->FinalizeFunction(*init_tuple_fn);

http://git-wip-us.apache.org/repos/asf/impala/blob/632ee044/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index c90613b..c6c6189 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -1477,15 +1477,14 @@ void PartitionedAggregationNode::ClosePartitions() {
 //
 Status PartitionedAggregationNode::CodegenUpdateSlot(LlvmCodeGen* codegen, int agg_fn_idx,
     SlotDescriptor* slot_desc, llvm::Function** fn) {
-  llvm::PointerType* agg_fn_eval_type =
-      codegen->GetPtrType(AggFnEvaluator::LLVM_CLASS_NAME);
+  llvm::PointerType* agg_fn_eval_type = codegen->GetStructPtrType<AggFnEvaluator>();
   llvm::StructType* tuple_struct = intermediate_tuple_desc_->GetLlvmStruct(codegen);
   if (tuple_struct == NULL) {
     return Status("PartitionedAggregationNode::CodegenUpdateSlot(): failed to generate "
                   "intermediate tuple desc");
   }
   llvm::PointerType* tuple_ptr_type = codegen->GetPtrType(tuple_struct);
-  llvm::PointerType* tuple_row_ptr_type = codegen->GetPtrType(TupleRow::LLVM_CLASS_NAME);
+  llvm::PointerType* tuple_row_ptr_type = codegen->GetStructPtrType<TupleRow>();
 
   LlvmCodeGen::FnPrototype prototype(codegen, "UpdateSlot", codegen->void_type());
   prototype.AddArgument(
@@ -1548,7 +1547,7 @@ Status PartitionedAggregationNode::CodegenUpdateSlot(LlvmCodeGen* codegen, int a
     llvm::Value* result = agg_fn->is_merge() ?
         builder.CreateAdd(dst_value, src.GetVal(), "count_sum") :
         builder.CreateAdd(
-            dst_value, codegen->GetIntConstant(TYPE_BIGINT, 1), "count_inc");
+            dst_value, codegen->GetI64Constant(1), "count_inc");
     builder.CreateStore(result, dst_slot_ptr);
     DCHECK(!slot_desc->is_nullable());
   } else if ((agg_op == AggFn::MIN || agg_op == AggFn::MAX) && dst_is_numeric_or_bool) {
@@ -1737,15 +1736,11 @@ Status PartitionedAggregationNode::CodegenUpdateTuple(
   }
 
   // Get the types to match the UpdateTuple signature
-  llvm::Type* agg_node_type =
-      codegen->GetType(PartitionedAggregationNode::LLVM_CLASS_NAME);
-  llvm::Type* tuple_type = codegen->GetType(Tuple::LLVM_CLASS_NAME);
-  llvm::Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME);
-
-  llvm::PointerType* agg_node_ptr_type = codegen->GetPtrType(agg_node_type);
-  llvm::PointerType* evals_type = codegen->GetPtrPtrType(AggFnEvaluator::LLVM_CLASS_NAME);
-  llvm::PointerType* tuple_ptr_type = codegen->GetPtrType(tuple_type);
-  llvm::PointerType* tuple_row_ptr_type = codegen->GetPtrType(tuple_row_type);
+  llvm::PointerType* agg_node_ptr_type =
+      codegen->GetStructPtrType<PartitionedAggregationNode>();
+  llvm::PointerType* evals_type = codegen->GetStructPtrPtrType<AggFnEvaluator>();
+  llvm::PointerType* tuple_ptr_type = codegen->GetStructPtrType<Tuple>();
+  llvm::PointerType* tuple_row_ptr_type = codegen->GetStructPtrType<TupleRow>();
 
   llvm::StructType* tuple_struct = intermediate_tuple_desc_->GetLlvmStruct(codegen);
   llvm::PointerType* tuple_ptr = codegen->GetPtrType(tuple_struct);
@@ -1754,7 +1749,7 @@ Status PartitionedAggregationNode::CodegenUpdateTuple(
   prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_fn_evals", evals_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("tuple", tuple_ptr_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
-  prototype.AddArgument(LlvmCodeGen::NamedVariable("is_merge", codegen->boolean_type()));
+  prototype.AddArgument(LlvmCodeGen::NamedVariable("is_merge", codegen->bool_type()));
 
   LlvmBuilder builder(codegen->context());
   llvm::Value* args[5];
@@ -1777,7 +1772,7 @@ Status PartitionedAggregationNode::CodegenUpdateTuple(
       // TODO: we should be able to hoist this up to the loop over the batch and just
       // increment the slot by the number of rows in the batch.
       int field_idx = slot_desc->llvm_field_idx();
-      llvm::Value* const_one = codegen->GetIntConstant(TYPE_BIGINT, 1);
+      llvm::Value* const_one = codegen->GetI64Constant(1);
       llvm::Value* slot_ptr =
           builder.CreateStructGEP(NULL, tuple_arg, field_idx, "src_slot");
       llvm::Value* slot_loaded = builder.CreateLoad(slot_ptr, "count_star_val");
@@ -1834,8 +1829,7 @@ Status PartitionedAggregationNode::CodegenProcessBatch(LlvmCodeGen* codegen,
 
     // Replace prefetch_mode with constant so branches can be optimised out.
     llvm::Value* prefetch_mode_arg = codegen->GetArgument(process_batch_fn, 3);
-    prefetch_mode_arg->replaceAllUsesWith(llvm::ConstantInt::get(
-        llvm::Type::getInt32Ty(codegen->context()), prefetch_mode));
+    prefetch_mode_arg->replaceAllUsesWith(codegen->GetI32Constant(prefetch_mode));
 
     // The codegen'd ProcessBatch function is only used in Open() with level_ = 0,
     // so don't use murmur hash
@@ -1898,13 +1892,11 @@ Status PartitionedAggregationNode::CodegenProcessBatchStreaming(
 
   // Make needs_serialize arg constant so dead code can be optimised out.
   llvm::Value* needs_serialize_arg = codegen->GetArgument(process_batch_streaming_fn, 2);
-  needs_serialize_arg->replaceAllUsesWith(llvm::ConstantInt::get(
-      llvm::Type::getInt1Ty(codegen->context()), needs_serialize_));
+  needs_serialize_arg->replaceAllUsesWith(codegen->GetBoolConstant(needs_serialize_));
 
   // Replace prefetch_mode with constant so branches can be optimised out.
   llvm::Value* prefetch_mode_arg = codegen->GetArgument(process_batch_streaming_fn, 3);
-  prefetch_mode_arg->replaceAllUsesWith(
-      llvm::ConstantInt::get(llvm::Type::getInt32Ty(codegen->context()), prefetch_mode));
+  prefetch_mode_arg->replaceAllUsesWith(codegen->GetI32Constant(prefetch_mode));
 
   llvm::Function* update_tuple_fn;
   RETURN_IF_ERROR(CodegenUpdateTuple(codegen, &update_tuple_fn));

http://git-wip-us.apache.org/repos/asf/impala/blob/632ee044/be/src/exec/partitioned-hash-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index 53e58c1..194bb92 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -820,8 +820,7 @@ Status PhjBuilder::CodegenProcessBuildBatch(LlvmCodeGen* codegen, llvm::Function
 
   llvm::Value* is_null_aware_arg = codegen->GetArgument(process_build_batch_fn, 5);
   is_null_aware_arg->replaceAllUsesWith(
-      llvm::ConstantInt::get(llvm::Type::getInt1Ty(codegen->context()),
-          join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN));
+      codegen->GetBoolConstant(join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN));
 
   llvm::Function* process_build_batch_fn_level0 =
       codegen->CloneFunction(process_build_batch_fn);
@@ -830,8 +829,8 @@ Status PhjBuilder::CodegenProcessBuildBatch(LlvmCodeGen* codegen, llvm::Function
   // Note that the first argument of this function is the return value.
   llvm::Value* build_filter_l0_arg =
       codegen->GetArgument(process_build_batch_fn_level0, 4);
-  build_filter_l0_arg->replaceAllUsesWith(llvm::ConstantInt::get(
-      llvm::Type::getInt1Ty(codegen->context()), filter_ctxs_.size() > 0));
+  build_filter_l0_arg->replaceAllUsesWith(
+      codegen->GetBoolConstant(filter_ctxs_.size() > 0));
 
   // process_build_batch_fn_level0 uses CRC hash if available,
   replaced =
@@ -847,8 +846,7 @@ Status PhjBuilder::CodegenProcessBuildBatch(LlvmCodeGen* codegen, llvm::Function
   // filters during the level0 build. Note that the first argument of this function is the
   // return value.
   llvm::Value* build_filter_arg = codegen->GetArgument(process_build_batch_fn, 4);
-  build_filter_arg->replaceAllUsesWith(
-      llvm::ConstantInt::get(llvm::Type::getInt1Ty(codegen->context()), false));
+  build_filter_arg->replaceAllUsesWith(codegen->false_value());
 
   // Finalize ProcessBuildBatch functions
   process_build_batch_fn = codegen->FinalizeFunction(process_build_batch_fn);
@@ -885,8 +883,7 @@ Status PhjBuilder::CodegenInsertBatch(LlvmCodeGen* codegen, llvm::Function* hash
   llvm::Value* prefetch_mode_arg = codegen->GetArgument(insert_batch_fn, 1);
   DCHECK_GE(prefetch_mode, TPrefetchMode::NONE);
   DCHECK_LE(prefetch_mode, TPrefetchMode::HT_BUCKET);
-  prefetch_mode_arg->replaceAllUsesWith(
-      llvm::ConstantInt::get(llvm::Type::getInt32Ty(codegen->context()), prefetch_mode));
+  prefetch_mode_arg->replaceAllUsesWith(codegen->GetI32Constant(prefetch_mode));
 
   // Use codegen'd EvalBuildRow() function
   int replaced = codegen->ReplaceCallSites(insert_batch_fn, eval_row_fn, "EvalBuildRow");
@@ -955,8 +952,8 @@ Status PhjBuilder::CodegenInsertRuntimeFilters(
   LlvmBuilder builder(context);
 
   *fn = nullptr;
-  llvm::Type* this_type = codegen->GetPtrType(PhjBuilder::LLVM_CLASS_NAME);
-  llvm::PointerType* tuple_row_ptr_type = codegen->GetPtrType(TupleRow::LLVM_CLASS_NAME);
+  llvm::Type* this_type = codegen->GetStructPtrType<PhjBuilder>();
+  llvm::PointerType* tuple_row_ptr_type = codegen->GetStructPtrType<TupleRow>();
   LlvmCodeGen::FnPrototype prototype(
       codegen, "InsertRuntimeFilters", codegen->void_type());
   prototype.AddArgument(LlvmCodeGen::NamedVariable("this", this_type));
@@ -971,8 +968,7 @@ Status PhjBuilder::CodegenInsertRuntimeFilters(
     llvm::Function* insert_fn;
     RETURN_IF_ERROR(FilterContext::CodegenInsert(
         codegen, filter_exprs_[i], &filter_ctxs_[i], &insert_fn));
-    llvm::PointerType* filter_context_type =
-        codegen->GetPtrType(FilterContext::LLVM_CLASS_NAME);
+    llvm::PointerType* filter_context_type = codegen->GetStructPtrType<FilterContext>();
     llvm::Value* filter_context_ptr =
         codegen->CastPtrToLlvmPtr(filter_context_type, &filter_ctxs_[i]);
 


[5/6] impala git commit: KUDU-2218. tls_socket: properly handle temporary socket errors in Writev

Posted by ta...@apache.org.
KUDU-2218. tls_socket: properly handle temporary socket errors in Writev

This fixes a bug which caused RaftConsensusITest.TestLargeBatches to
fail when run under stress, as in the following command line:

taskset -c 0-4 \
 build/latest/bin/raft_consensus-itest \
   --gtest_filter=\*LargeBat\* \
   --stress-cpu-threads=8

This would produce an error like:
Network error: failed to write to TLS socket: error:1409F07F:SSL routines:SSL3_WRITE_PENDING:bad write retry:s3_pkt.c:878

This means that we were retrying a write after getting EAGAIN, but with
a different buffer than the first time.

I tracked this down to mishandling of temporary socket errors in
TlsSocket::Writev(). In the case that we successfully write part of the
io vector but hit such an error trying to write a later element in the
vector, we were still propagating the error back up to the caller. The
caller didn't realize that part of the write was successful, and thus it
would retry the write from the beginning.

The fix is to fix the above, but also to enable partial writes in
TlsContext. The new test fails if either of the above two changes are
backed out.

Change-Id: If797f220f42bfb2e6f452b66f15e7a758e883472
Reviewed-on: http://gerrit.cloudera.org:8080/8570
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/9361
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/546d1dd0
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/546d1dd0
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/546d1dd0

Branch: refs/heads/2.x
Commit: 546d1dd083667aa0726b75b1fd093fde294482d1
Parents: e7261e3
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Nov 15 22:55:44 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 21 09:05:22 2018 +0000

----------------------------------------------------------------------
 be/src/kudu/security/tls_context.cc |   2 +-
 be/src/kudu/security/tls_socket.cc  |  10 +-
 security/tls_socket-test.cc         | 277 +++++++++++++++++++++++--------
 3 files changed, 219 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/546d1dd0/be/src/kudu/security/tls_context.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/tls_context.cc b/be/src/kudu/security/tls_context.cc
index b6ec57f..f94e3d2 100644
--- a/be/src/kudu/security/tls_context.cc
+++ b/be/src/kudu/security/tls_context.cc
@@ -96,7 +96,7 @@ Status TlsContext::Init() {
   if (!ctx_) {
     return Status::RuntimeError("failed to create TLS context", GetOpenSSLErrors());
   }
-  SSL_CTX_set_mode(ctx_.get(), SSL_MODE_AUTO_RETRY);
+  SSL_CTX_set_mode(ctx_.get(), SSL_MODE_AUTO_RETRY | SSL_MODE_ENABLE_PARTIAL_WRITE);
 
   // Disable SSLv2 and SSLv3 which are vulnerable to various issues such as POODLE.
   // We support versions back to TLSv1.0 since OpenSSL on RHEL 6.4 and earlier does not

http://git-wip-us.apache.org/repos/asf/impala/blob/546d1dd0/be/src/kudu/security/tls_socket.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/tls_socket.cc b/be/src/kudu/security/tls_socket.cc
index f725a49..e68cdce 100644
--- a/be/src/kudu/security/tls_socket.cc
+++ b/be/src/kudu/security/tls_socket.cc
@@ -25,6 +25,7 @@
 #include "kudu/security/cert.h"
 #include "kudu/security/openssl_util.h"
 #include "kudu/util/errno.h"
+#include "kudu/util/net/socket.h"
 
 namespace kudu {
 namespace security {
@@ -42,11 +43,11 @@ Status TlsSocket::Write(const uint8_t *buf, int32_t amt, int32_t *nwritten) {
   CHECK(ssl_);
   SCOPED_OPENSSL_NO_PENDING_ERRORS;
 
+  *nwritten = 0;
   if (PREDICT_FALSE(amt == 0)) {
     // Writing an empty buffer is a no-op. This happens occasionally, eg in the
     // case where the response has an empty sidecar. We have to special case
     // it, because SSL_write can return '0' to indicate certain types of errors.
-    *nwritten = 0;
     return Status::OK();
   }
 
@@ -61,7 +62,6 @@ Status TlsSocket::Write(const uint8_t *buf, int32_t amt, int32_t *nwritten) {
                                     ErrnoToString(save_errno), save_errno);
       }
       // Socket not ready to write yet.
-      *nwritten = 0;
       return Status::OK();
     }
     return Status::NetworkError("failed to write to TLS socket",
@@ -90,6 +90,12 @@ Status TlsSocket::Writev(const struct ::iovec *iov, int iov_len, int32_t *nwritt
   }
   RETURN_NOT_OK(SetTcpCork(0));
   *nwritten = total_written;
+  // If we did manage to write something, but not everything, due to a temporary socket
+  // error, then we should still return an OK status indicating a successful _partial_
+  // write.
+  if (total_written > 0 && Socket::IsTemporarySocketError(write_status.posix_code())) {
+    return Status::OK();
+  }
   return write_status;
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/546d1dd0/security/tls_socket-test.cc
----------------------------------------------------------------------
diff --git a/security/tls_socket-test.cc b/security/tls_socket-test.cc
index a978e68..214d2bf 100644
--- a/security/tls_socket-test.cc
+++ b/security/tls_socket-test.cc
@@ -17,7 +17,11 @@
 
 #include "kudu/security/tls_handshake.h"
 
+#include <algorithm>
 #include <pthread.h>
+#include <sched.h>
+#include <sys/socket.h>
+#include <sys/uio.h>
 
 #include <atomic>
 #include <csignal>
@@ -28,6 +32,7 @@
 #include <memory>
 #include <string>
 #include <thread>
+#include <vector>
 
 #include <glog/logging.h>
 #include <gtest/gtest.h>
@@ -38,6 +43,8 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/net/socket.h"
+#include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
@@ -46,25 +53,27 @@
 using std::string;
 using std::thread;
 using std::unique_ptr;
-
+using std::vector;
 
 namespace kudu {
 namespace security {
 
+const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
+
+// Size is big enough to not fit into output socket buffer of default size
+// (controlled by setsockopt() with SO_SNDBUF).
+constexpr size_t kEchoChunkSize = 32 * 1024 * 1024;
 
 class TlsSocketTest : public KuduTest {
  public:
   void SetUp() override {
     KuduTest::SetUp();
-
     ASSERT_OK(client_tls_.Init());
-    ASSERT_OK(server_tls_.Init());
-    ASSERT_OK(server_tls_.GenerateSelfSignedCertAndKey());
   }
 
  protected:
+  void ConnectClient(const Sockaddr& addr, unique_ptr<Socket>* sock);
   TlsContext client_tls_;
-  TlsContext server_tls_;
 };
 
 Status DoNegotiationSide(Socket* sock, TlsHandshake* tls, const char* side) {
@@ -101,19 +110,112 @@ Status DoNegotiationSide(Socket* sock, TlsHandshake* tls, const char* side) {
   return Status::OK();
 }
 
+void TlsSocketTest::ConnectClient(const Sockaddr& addr, unique_ptr<Socket>* sock) {
+  unique_ptr<Socket> client_sock(new Socket());
+  ASSERT_OK(client_sock->Init(0));
+  ASSERT_OK(client_sock->Connect(addr));
+
+  TlsHandshake client;
+  ASSERT_OK(client_tls_.InitiateHandshake(TlsHandshakeType::CLIENT, &client));
+  ASSERT_OK(DoNegotiationSide(client_sock.get(), &client, "client"));
+  ASSERT_OK(client.Finish(&client_sock));
+  *sock = std::move(client_sock);
+}
+
+class EchoServer {
+ public:
+  EchoServer()
+      : pthread_sync_(1) {
+  }
+  ~EchoServer() {
+    Stop();
+    Join();
+  }
+
+  void Start() {
+    ASSERT_OK(server_tls_.Init());
+    ASSERT_OK(server_tls_.GenerateSelfSignedCertAndKey());
+    ASSERT_OK(listen_addr_.ParseString("127.0.0.1", 0));
+    ASSERT_OK(listener_.Init(0));
+    ASSERT_OK(listener_.BindAndListen(listen_addr_, /*listen_queue_size=*/10));
+    ASSERT_OK(listener_.GetSocketAddress(&listen_addr_));
+
+    thread_ = thread([&] {
+        pthread_ = pthread_self();
+        pthread_sync_.CountDown();
+        unique_ptr<Socket> sock(new Socket());
+        Sockaddr remote;
+        CHECK_OK(listener_.Accept(sock.get(), &remote, /*flags=*/0));
+
+        TlsHandshake server;
+        CHECK_OK(server_tls_.InitiateHandshake(TlsHandshakeType::SERVER, &server));
+        CHECK_OK(DoNegotiationSide(sock.get(), &server, "server"));
+        CHECK_OK(server.Finish(&sock));
+
+        CHECK_OK(sock->SetRecvTimeout(kTimeout));
+        unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
+        // An "echo" loop for kEchoChunkSize byte buffers.
+        while (!stop_) {
+          size_t n;
+          Status s = sock->BlockingRecv(buf.get(), kEchoChunkSize, &n, MonoTime::Now() + kTimeout);
+          if (!s.ok()) {
+            CHECK(stop_) << "unexpected error reading: " << s.ToString();
+          }
+
+          LOG(INFO) << "server echoing " << n << " bytes";
+          size_t written;
+          s = sock->BlockingWrite(buf.get(), n, &written, MonoTime::Now() + kTimeout);
+          if (!s.ok()) {
+            CHECK(stop_) << "unexpected error writing: " << s.ToString();
+          }
+          if (slow_read_) {
+            SleepFor(MonoDelta::FromMilliseconds(10));
+          }
+        }
+      });
+  }
+
+  void EnableSlowRead() {
+    slow_read_ = true;
+  }
+
+  const Sockaddr& listen_addr() const {
+    return listen_addr_;
+  }
+
+  bool stopped() const {
+    return stop_;
+  }
+
+  void Stop() {
+    stop_ = true;
+  }
+  void Join() {
+    thread_.join();
+  }
+
+  const pthread_t& pthread() {
+    pthread_sync_.Wait();
+    return pthread_;
+  }
+
+ private:
+  TlsContext server_tls_;
+  Socket listener_;
+  Sockaddr listen_addr_;
+  thread thread_;
+  pthread_t pthread_;
+  CountDownLatch pthread_sync_;
+  std::atomic<bool> stop_ { false };
+
+  bool slow_read_ = false;
+};
+
 void handler(int /* signal */) {}
 
 // Test for failures to handle EINTR during TLS connection
 // negotiation and data send/receive.
 TEST_F(TlsSocketTest, TestTlsSocketInterrupted) {
-  const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
-  Sockaddr listen_addr;
-  ASSERT_OK(listen_addr.ParseString("127.0.0.1", 0));
-  Socket listener;
-  ASSERT_OK(listener.Init(0));
-  ASSERT_OK(listener.BindAndListen(listen_addr, /*listen_queue_size=*/10));
-  ASSERT_OK(listener.GetSocketAddress(&listen_addr));
-
   // Set up a no-op signal handler for SIGUSR2.
   struct sigaction sa, sa_old;
   memset(&sa, 0, sizeof(sa));
@@ -121,76 +223,117 @@ TEST_F(TlsSocketTest, TestTlsSocketInterrupted) {
   sigaction(SIGUSR2, &sa, &sa_old);
   SCOPED_CLEANUP({ sigaction(SIGUSR2, &sa_old, nullptr); });
 
-  // Size is big enough to not fit into output socket buffer of default size
-  // (controlled by setsockopt() with SO_SNDBUF).
-  constexpr size_t kSize = 32 * 1024 * 1024;
-
-  pthread_t server_tid;
-  CountDownLatch server_tid_sync(1);
-  std::atomic<bool> stop { false };
-  thread server([&] {
-      server_tid = pthread_self();
-      server_tid_sync.CountDown();
-      unique_ptr<Socket> sock(new Socket());
-      Sockaddr remote;
-      CHECK_OK(listener.Accept(sock.get(), &remote, /*flags=*/0));
-
-      TlsHandshake server;
-      CHECK_OK(server_tls_.InitiateHandshake(TlsHandshakeType::SERVER, &server));
-      CHECK_OK(DoNegotiationSide(sock.get(), &server, "server"));
-      CHECK_OK(server.Finish(&sock));
-
-      CHECK_OK(sock->SetRecvTimeout(kTimeout));
-      unique_ptr<uint8_t[]> buf(new uint8_t[kSize]);
-      // An "echo" loop for kSize byte buffers.
-      while (!stop) {
-        size_t n;
-        Status s = sock->BlockingRecv(buf.get(), kSize, &n, MonoTime::Now() + kTimeout);
-        if (s.ok()) {
-          size_t written;
-          s = sock->BlockingWrite(buf.get(), n, &written, MonoTime::Now() + kTimeout);
-        }
-        if (!s.ok()) {
-          CHECK(stop) << "unexpected error: " << s.ToString();
-        }
-      }
-    });
-  SCOPED_CLEANUP({ server.join(); });
+  EchoServer server;
+  NO_FATALS(server.Start());
 
   // Start a thread to send signals to the server thread.
   thread killer([&]() {
-    server_tid_sync.Wait();
-    while (!stop) {
-      PCHECK(pthread_kill(server_tid, SIGUSR2) == 0);
-      SleepFor(MonoDelta::FromMicroseconds(rand() % 10));
-    }
-  });
+      while (!server.stopped()) {
+        PCHECK(pthread_kill(server.pthread(), SIGUSR2) == 0);
+        SleepFor(MonoDelta::FromMicroseconds(rand() % 10));
+      }
+    });
   SCOPED_CLEANUP({ killer.join(); });
 
-  unique_ptr<Socket> client_sock(new Socket());
-  ASSERT_OK(client_sock->Init(0));
-  ASSERT_OK(client_sock->Connect(listen_addr));
+  unique_ptr<Socket> client_sock;
+  NO_FATALS(ConnectClient(server.listen_addr(), &client_sock));
 
-  TlsHandshake client;
-  ASSERT_OK(client_tls_.InitiateHandshake(TlsHandshakeType::CLIENT, &client));
-  ASSERT_OK(DoNegotiationSide(client_sock.get(), &client, "client"));
-  ASSERT_OK(client.Finish(&client_sock));
-
-  unique_ptr<uint8_t[]> buf(new uint8_t[kSize]);
+  unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
   for (int i = 0; i < 10; i++) {
     SleepFor(MonoDelta::FromMilliseconds(1));
     size_t nwritten;
-    ASSERT_OK(client_sock->BlockingWrite(buf.get(), kSize, &nwritten,
+    ASSERT_OK(client_sock->BlockingWrite(buf.get(), kEchoChunkSize, &nwritten,
         MonoTime::Now() + kTimeout));
     size_t n;
-    ASSERT_OK(client_sock->BlockingRecv(buf.get(), kSize, &n,
+    ASSERT_OK(client_sock->BlockingRecv(buf.get(), kEchoChunkSize, &n,
         MonoTime::Now() + kTimeout));
   }
-  stop = true;
+  server.Stop();
   ASSERT_OK(client_sock->Close());
-
   LOG(INFO) << "client done";
 }
 
+// Return an iovec containing the same data as the buffer 'buf' with the length 'len',
+// but split into random-sized chunks. The chunks are sized randomly between 1 and
+// 'max_chunk_size' bytes.
+vector<struct iovec> ChunkIOVec(Random* rng, uint8_t* buf, int len, int max_chunk_size) {
+  vector<struct iovec> ret;
+  uint8_t* p = buf;
+  int rem = len;
+  while (rem > 0) {
+    int len = rng->Uniform(max_chunk_size) + 1;
+    len = std::min(len, rem);
+    ret.push_back({p, static_cast<size_t>(len)});
+    p += len;
+    rem -= len;
+  }
+  return ret;
+}
+
+// Regression test for KUDU-2218, a bug in which Writev would improperly handle
+// partial writes in non-blocking mode.
+TEST_F(TlsSocketTest, TestNonBlockingWritev) {
+  Random rng(GetRandomSeed32());
+
+  EchoServer server;
+  server.EnableSlowRead();
+  NO_FATALS(server.Start());
+
+  unique_ptr<Socket> client_sock;
+  NO_FATALS(ConnectClient(server.listen_addr(), &client_sock));
+
+  int sndbuf = 16 * 1024;
+  CHECK_ERR(setsockopt(client_sock->GetFd(), SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof(sndbuf)));
+
+  unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
+  unique_ptr<uint8_t[]> rbuf(new uint8_t[kEchoChunkSize]);
+  RandomString(buf.get(), kEchoChunkSize, &rng);
+
+  for (int i = 0; i < 10; i++) {
+    ASSERT_OK(client_sock->SetNonBlocking(true));
+
+    // Prepare an IOV with the input data split into a bunch of randomly-sized
+    // chunks.
+    vector<struct iovec> iov = ChunkIOVec(&rng, buf.get(), kEchoChunkSize, 1024 * 1024);
+
+    // Loop calling writev until the iov is exhausted
+    int rem = kEchoChunkSize;
+    while (rem > 0) {
+      CHECK(!iov.empty()) << rem;
+      int32_t n;
+      Status s = client_sock->Writev(&iov[0], iov.size(), &n);
+      if (Socket::IsTemporarySocketError(s.posix_code())) {
+        sched_yield();
+        continue;
+      }
+      ASSERT_OK(s);
+      rem -= n;
+      ASSERT_GE(n, 0);
+      while (n > 0) {
+        if (n < iov[0].iov_len) {
+          iov[0].iov_len -= n;
+          iov[0].iov_base = reinterpret_cast<uint8_t*>(iov[0].iov_base) + n;
+          n = 0;
+        } else {
+          n -= iov[0].iov_len;
+          iov.erase(iov.begin());
+        }
+      }
+    }
+    LOG(INFO) << "client waiting";
+
+    size_t n;
+    ASSERT_OK(client_sock->SetNonBlocking(false));
+    ASSERT_OK(client_sock->BlockingRecv(rbuf.get(), kEchoChunkSize, &n,
+        MonoTime::Now() + kTimeout));
+    LOG(INFO) << "client got response";
+
+    ASSERT_EQ(0, memcmp(buf.get(), rbuf.get(), kEchoChunkSize));
+  }
+
+  server.Stop();
+  ASSERT_OK(client_sock->Close());
+}
+
 } // namespace security
 } // namespace kudu


[4/6] impala git commit: IMPALA-4874: Increase maximum KRPC message size

Posted by ta...@apache.org.
IMPALA-4874: Increase maximum KRPC message size

The default value for rpc_max_message_size is 50MB.
Impala currently requires support for messages of
up to 2GB. This changes the value of rpc_max_message_size
to INT_MAX for Impala.

Testing:
- Added a test to test_very_large_strings that generates
  a row with multiple large strings. This row requires
  that the RPC framework successfully transmit over
  400MB. This works for both KRPC and Thrift.
  This query operates under the same amount of memory
  as other queries in large_strings.test.
- Tested separately that larger row sizes also work,
  including tests up to almost 2GB.

Change-Id: I876bba0536e1d85e41eacd9c0aeccfe5c2126e58
Reviewed-on: http://gerrit.cloudera.org:8080/9337
Reviewed-by: Joe McDonnell <jo...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e7261e3e
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e7261e3e
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e7261e3e

Branch: refs/heads/2.x
Commit: e7261e3eca3685348006b8f46037ffa3552707ad
Parents: 632ee04
Author: Joe McDonnell <jo...@cloudera.com>
Authored: Thu Feb 15 13:10:21 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 21 09:05:22 2018 +0000

----------------------------------------------------------------------
 be/src/rpc/rpc-mgr.cc                              |  7 +++++++
 .../queries/QueryTest/large_strings.test           | 17 +++++++++++++++++
 2 files changed, 24 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/e7261e3e/be/src/rpc/rpc-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.cc b/be/src/rpc/rpc-mgr.cc
index d723280..7e05f38 100644
--- a/be/src/rpc/rpc-mgr.cc
+++ b/be/src/rpc/rpc-mgr.cc
@@ -57,6 +57,9 @@ DECLARE_string(ssl_minimum_version);
 // Defined in kudu/rpc/rpcz_store.cc
 DECLARE_int32(rpc_duration_too_long_ms);
 
+// Defined in kudu/rpc/transfer.cc
+DECLARE_int32(rpc_max_message_size);
+
 DEFINE_int32(num_acceptor_threads, 2,
     "Number of threads dedicated to accepting connection requests for RPC services");
 DEFINE_int32(num_reactor_threads, 0,
@@ -75,6 +78,10 @@ Status RpcMgr::Init() {
   // Log any RPCs which take longer than 2 minutes.
   FLAGS_rpc_duration_too_long_ms = 2 * 60 * 1000;
 
+  // IMPALA-4874: Impala requires support for messages up to 2GB. Override KRPC's default
+  //              maximum of 50MB.
+  FLAGS_rpc_max_message_size = numeric_limits<int32_t>::max();
+
   MessengerBuilder bld("impala-server");
   const scoped_refptr<MetricEntity> entity(
       METRIC_ENTITY_server.Instantiate(&registry_, "krpc-metrics"));

http://git-wip-us.apache.org/repos/asf/impala/blob/e7261e3e/testdata/workloads/functional-query/queries/QueryTest/large_strings.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/large_strings.test b/testdata/workloads/functional-query/queries/QueryTest/large_strings.test
index 4419953..1d930db 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/large_strings.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/large_strings.test
@@ -209,3 +209,20 @@ select length(madlib_decode_vector(concat_ws(',', s, s, s, s))) from (
 ---- CATCH
 String length larger than allowed limit of 1 GB character data
 =====
+---- QUERY
+# IMPALA-4874: Generate a large row made up of multiple large strings to test RPC
+#              transmission. This uses hashing to make this difficult to compress,
+#              which results in a larger row batch.
+select length(group_concat(h, "!")),
+       length(group_concat(h, "-"))
+from (
+select cast(fnv_hash(concat(l_comment, 'a')) as string) as h from tpch_parquet.lineitem union all
+select cast(fnv_hash(concat(l_comment, 'b')) as string) as h from tpch_parquet.lineitem union all
+select cast(fnv_hash(concat(l_comment, 'c')) as string) as h from tpch_parquet.lineitem union all
+select cast(fnv_hash(concat(l_comment, 'd')) as string) as h from tpch_parquet.lineitem union all
+select cast(fnv_hash(concat(l_comment, 'e')) as string) as h from tpch_parquet.lineitem) a;
+---- TYPES
+INT,INT
+---- RESULTS
+611468161,611468161
+=====
\ No newline at end of file