You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2016/10/25 05:53:21 UTC

[1/6] incubator-impala git commit: Minor compute stats script fixes

Repository: incubator-impala
Updated Branches:
  refs/heads/master 61fcb4897 -> 13455b5a2


Minor compute stats script fixes

* Change run-step to output full log path
* Change text to say "Computing table stats" rather than "Computing
  HBase stats" when running compute-table-stats.sh

Change-Id: I326f4c370fda8d5e388af8e2395623185c06bc07
Reviewed-on: http://gerrit.cloudera.org:8080/4825
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/e0a32721
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/e0a32721
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/e0a32721

Branch: refs/heads/master
Commit: e0a32721297db9d85da4a1d749672289537aca50
Parents: 61fcb48
Author: Henry Robinson <he...@cloudera.com>
Authored: Mon Oct 24 11:45:46 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Tue Oct 25 00:13:54 2016 +0000

----------------------------------------------------------------------
 testdata/bin/create-load-data.sh | 4 ++--
 testdata/bin/run-step.sh         | 3 +--
 2 files changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e0a32721/testdata/bin/create-load-data.sh
----------------------------------------------------------------------
diff --git a/testdata/bin/create-load-data.sh b/testdata/bin/create-load-data.sh
index 38e38df..7631c1c 100755
--- a/testdata/bin/create-load-data.sh
+++ b/testdata/bin/create-load-data.sh
@@ -425,9 +425,9 @@ if [ "${TARGET_FILESYSTEM}" = "hdfs" ]; then
       create-internal-hbase-table
 fi
 
-# TODO: Investigate why all stats are not preserved. Theorectically, we only need to
+# TODO: Investigate why all stats are not preserved. Theoretically, we only need to
 # recompute stats for HBase.
-run-step "Computing HBase stats" compute-hbase-stats.log \
+run-step "Computing table stats" compute-table-stats.log \
     ${IMPALA_HOME}/testdata/bin/compute-table-stats.sh
 
 run-step "Copying auth policy file" copy-auth-policy.log copy-auth-policy

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e0a32721/testdata/bin/run-step.sh
----------------------------------------------------------------------
diff --git a/testdata/bin/run-step.sh b/testdata/bin/run-step.sh
old mode 100644
new mode 100755
index b7d1671..faef360
--- a/testdata/bin/run-step.sh
+++ b/testdata/bin/run-step.sh
@@ -36,7 +36,7 @@ function run-step {
   fi
   local LOG=${LOG_DIR}/${LOG_FILE_NAME}
 
-  echo -n "${MSG} (logging to ${LOG_FILE_NAME})... "
+  echo -n "${MSG} (logging to ${LOG})... "
   echo "Log for command '$@'" > ${LOG}
   if ! "$@" >> ${LOG} 2>&1 ; then
     echo "FAILED"
@@ -46,4 +46,3 @@ function run-step {
   fi
   echo OK
 }
-


[4/6] incubator-impala git commit: IMPALA-4339: ensure coredumps end up in IMPALA_HOME

Posted by kw...@apache.org.
IMPALA-4339: ensure coredumps end up in IMPALA_HOME

Change-Id: Ibc34d152139653374f940dc3edbca08e749bf55e
Reviewed-on: http://gerrit.cloudera.org:8080/4785
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/a6257013
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/a6257013
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/a6257013

Branch: refs/heads/master
Commit: a6257013fad4d9bd0fcdef805b189a2cc666057a
Parents: 99ed6dc
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Fri Oct 21 15:06:12 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Tue Oct 25 04:17:58 2016 +0000

----------------------------------------------------------------------
 buildall.sh | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a6257013/buildall.sh
----------------------------------------------------------------------
diff --git a/buildall.sh b/buildall.sh
index d7159e7..d830ae3 100755
--- a/buildall.sh
+++ b/buildall.sh
@@ -39,6 +39,9 @@ if ! . "$ROOT"/bin/impala-config.sh; then
   exit 1
 fi
 
+# Change to IMPALA_HOME so that coredumps, etc end up in IMPALA_HOME.
+cd "${IMPALA_HOME}"
+
 # Defaults that are only changable via the commandline.
 CLEAN_ACTION=1
 TESTDATA_ACTION=0


[2/6] incubator-impala git commit: Add distcc infrastructure.

Posted by kw...@apache.org.
Add distcc infrastructure.

This has been working for several months, and it it was written mainly
by Casey Ching while he was at Cloudera working on Impala.

Change-Id: Ia4bc78ad46dda13e4533183195af632f46377cae
Reviewed-on: http://gerrit.cloudera.org:8080/4820
Reviewed-by: Jim Apple <jb...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/0eaff805
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/0eaff805
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/0eaff805

Branch: refs/heads/master
Commit: 0eaff805e28dd4afac134f58b294732e414235ce
Parents: e0a3272
Author: Jim Apple <jb...@cloudera.com>
Authored: Sun Oct 23 14:54:08 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Tue Oct 25 01:15:50 2016 +0000

----------------------------------------------------------------------
 .gitignore               |   2 +-
 bin/distcc/.gitignore    |   1 +
 bin/distcc/README.md     | 106 ++++++++++++++++++++++++++++
 bin/distcc/distcc.sh     |  62 ++++++++++++++++
 bin/distcc/distcc_env.sh | 160 ++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 330 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0eaff805/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 849ee61..e63f863 100644
--- a/.gitignore
+++ b/.gitignore
@@ -13,7 +13,7 @@ org.eclipse.jdt.ui.prefs
 load-*-generated.sql
 bin/version.info
 
-# Cloudera distcc options
+# distcc options
 .impala_compiler_opts
 
 pprof.out

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0eaff805/bin/distcc/.gitignore
----------------------------------------------------------------------
diff --git a/bin/distcc/.gitignore b/bin/distcc/.gitignore
new file mode 100644
index 0000000..ce71f70
--- /dev/null
+++ b/bin/distcc/.gitignore
@@ -0,0 +1 @@
+ld

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0eaff805/bin/distcc/README.md
----------------------------------------------------------------------
diff --git a/bin/distcc/README.md b/bin/distcc/README.md
new file mode 100644
index 0000000..2de7d4a
--- /dev/null
+++ b/bin/distcc/README.md
@@ -0,0 +1,106 @@
+# Distcc
+Distcc will speed up compilation by distributing compilation tasks to remote build
+machines. The scripts in this folder make using distcc easier.
+
+# Requirements
+
+The only requirement you should need to be aware of is, the scripts in this folder were
+only tested on Linux. If you are using OS X, things probably won't work out of the box.
+
+Assuming you are using Linux, if you use the scripts in this folder, there shouldn't be
+any other requirements. The distcc program should be installed and configured
+automatically. Still, understanding what is involved could be useful.
+
+**You shouldn't need to do any of this, this scripts do this for you.**
+
+1. Install distcc and ccache. Most Linux distros have these packages. The scripts will
+   install it if you have a yum or apt-get based system. Otherwise you should install
+   distcc and ccache yourself through whatever package manager your system uses.
+1. Configure the remote distcc hosts. Set your environment variable BUILD_FARM to
+   "host1/limit1,lzo host2/limit2,lzo" and so on.
+1. Your local compiler needs to be at the same path as it is on the remote build slaves.
+   That path is /opt/Impala-Toolchain/<gcc-version-folder>/bin/gcc. In other words, make
+   sure the Impala toolchain is available at /opt/Impala-Toolchain. That can be done
+   through a symlink, and that's what the scripts will attempt to setup.
+
+# Usage
+
+### First time
+1. Source bin/impala-config.sh in the Impala repo. Step #2 depends on this.
+
+        source "$IMPALA_HOME"/bin/impala-config.sh
+
+1. Source "distcc_env.sh" in this directory. The script will attempt to install distcc
+   if needed.
+
+        source "$IMPALA_AUX_TEST_HOME"/distcc/distcc_env.sh
+
+1. Run buildall.sh. The main purpose is to regenerate cmakefiles.
+
+        cd "$IMPALA_HOME"
+        ./buildall.sh -skiptests -so   # Do not use -noclean
+
+   You should notice that the build runs quite a bit faster.
+
+### Incremental builds
+At this point you no longer need to run the heavyweight buildall.sh. After editing files
+you can either
+```
+make -j$(distcc -j)
+```
+or
+```
+bin/make_impala.sh
+```
+
+### Switiching back to local compilation
+If you want to compile a very small change, a local build might be faster.
+```
+switch_compiler local
+```
+to switch back
+```
+switch_compiler distcc
+```
+
+### Switch to clang++
+Clang is faster and gives better error messages. This setup is still somewhat
+experimental.
+```
+switch_compiler clang
+```
+to switch back
+```
+switch_compiler gcc
+```
+
+### Second time
+If you open a new terminal and attempt to build with "make" or "bin/make_impala.sh",
+that will fail. To fix:
+```
+source "$IMPALA_HOME"/bin/impala-config.sh   # Skip if already done
+source "$IMPALA_HOME"/bin/distcc/distcc_env.sh
+```
+
+# Setting up a new distcc server
+
+1. Install "distccd" and "ccache".
+1. Configure distccd (edit /etc/sysconfig/distccd on a RHEL server) with the options
+   OPTIONS="--jobs 96 --allow YOUR.IP.ADDRESS.HERE --log-level=warn --nice=-15"
+   Where num jobs = 2x the number of cores on the machine. (2x is recommended by distcc.)
+1. Start distcc.
+1. Edit distcc_env.sh to include the new host.
+1. Install all gcc and binutils versions from the toolchain into /opt/Impala-Toolchain.
+1. ccache stores its cache in $HOME/.ccache. Assuming distcc is running as a non-root user
+   that has no $HOME, you must sudo mkdir /.ccache, then sudo chmod 777 /.ccache.
+1. If distcc runs as "nobody", sudo -u nobody ccache -M 25G. This sets the size of the
+   cache to 25GB. Adjust to your taste.
+
+# Misc notes
+
+1. "pump" doesn't work. Many compilation attempts time out say something like "Include
+   server did not process the request in 3.8 seconds". distcc tries to copy 3rd party
+   headers to the remote hosts and that may be the problem. If we could get the include
+   server to use the remote 3rd party headers that should help.
+1. Having a different local Linux OS on your development machine than on the distcc hosts
+   should be fine.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0eaff805/bin/distcc/distcc.sh
----------------------------------------------------------------------
diff --git a/bin/distcc/distcc.sh b/bin/distcc/distcc.sh
new file mode 100755
index 0000000..a1136e8
--- /dev/null
+++ b/bin/distcc/distcc.sh
@@ -0,0 +1,62 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+if [[ -z "$DISTCC_HOSTS" || -z "$IMPALA_REAL_CXX_COMPILER" ]]; then
+  # This could be sourced here and the build would work but the parallelization (-j)
+  # should be wrong at this point and it's too late to fix.
+  DIR=$(dirname "$0")
+  echo "You must source '$DIR/distcc_env.sh' before attempting to build." 1>&2
+  exit 1
+fi
+
+TOOLCHAIN_DIR=/opt/Impala-Toolchain
+if [[ ! -d "$TOOLCHAIN_DIR" ]]; then
+  if [[ -n "$IMPALA_TOOLCHAIN" && -d "$IMPALA_TOOLCHAIN" ]]; then
+    if ! sudo -n -- ln -s "$IMPALA_TOOLCHAIN" "$TOOLCHAIN_DIR" &>/dev/null; then
+      echo The toolchain must be available at $TOOLCHAIN_DIR for distcc. \
+          Try running '"sudo ln -s $IMPALA_TOOLCHAIN $TOOLCHAIN_DIR"'. 1>&2
+      exit 1
+    fi
+  fi
+  echo "The toolchain wasn't found at '$TOOLCHAIN_DIR' and IMPALA_TOOLCHAIN is not set." \
+      Make sure the toolchain is available at $TOOLCHAIN_DIR and try again. 1>&2
+  exit 1
+fi
+
+CMD=
+CMD_POST_ARGS=
+if $IMPALA_USE_DISTCC; then
+  CMD="distcc ccache"
+fi
+
+GCC_ROOT="$TOOLCHAIN_DIR/gcc-$IMPALA_GCC_VERSION"
+case "$IMPALA_REAL_CXX_COMPILER" in
+  gcc) CMD+=" $GCC_ROOT/bin/g++";;
+  clang) # Assume the compilation options were setup for gcc, which would happen using
+         # default build options. Now some additional options need to be added for clang.
+         CMD+=" $TOOLCHAIN_DIR/llvm-$IMPALA_LLVM_ASAN_VERSION/bin/clang++"
+         CMD+=" --gcc-toolchain=$GCC_ROOT"
+         # -Wno-unused-local-typedef needs to go after -Wall
+         # -Wno-error is needed, clang generates more warnings than gcc.
+         CMD_POST_ARGS+=" -Wno-unused-local-typedef -Wno-error";;
+  *) echo "Unexpected IMPALA_REAL_CXX_COMPILER: '$IMPALA_REAL_CXX_COMPILER'" 1>&2
+     exit 1;;
+esac
+
+exec $CMD "$@" $CMD_POST_ARGS

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0eaff805/bin/distcc/distcc_env.sh
----------------------------------------------------------------------
diff --git a/bin/distcc/distcc_env.sh b/bin/distcc/distcc_env.sh
new file mode 100644
index 0000000..173cc18
--- /dev/null
+++ b/bin/distcc/distcc_env.sh
@@ -0,0 +1,160 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# This file is intended to be sourced by a shell (zsh and bash have been tested).
+
+if [[ -z $BUILD_FARM ]]
+then
+  echo "BUILD_FARM must be set to configure distcc" >&2
+  return 1
+fi
+
+if [[ ! -z $ZSH_NAME ]]; then
+  DISTCC_ENV_DIR=$(cd $(dirname ${(%):-%x}) && pwd)
+else
+  DISTCC_ENV_DIR=$(cd $(dirname "${BASH_SOURCE[0]}") && pwd)
+fi
+
+function cmd_exists {
+  which $1 &>/dev/null
+}
+
+INSTALLER=
+if cmd_exists apt-get; then
+  INSTALLER=apt-get
+elif cmd_exists yum; then
+  INSTALLER=yum
+fi
+
+if ! cmd_exists distcc; then
+  echo distcc command not found, attempting installation
+  if [[ -z $INSTALLER ]] || ! sudo $INSTALLER -y install distcc; then
+    echo Unable to automatically install distcc. You need to install it manually. 1>&2
+    return 1
+  fi
+fi
+
+# Install CCache if necessary.
+if ! cmd_exists ccache; then
+  echo "ccache command not found, attempting installation"
+  if [[ -z $INSTALLER ]] || ! sudo $INSTALLER -y install ccache; then
+    echo "Unable to automatically install ccache"
+    return 1
+  fi
+fi
+
+# Don't include localhost in the list. It is already the slowest part of the build because
+# it needs to do preprocessing and linking. There shouldn't be a need to add an extra
+# compilation worker.
+export DISTCC_HOSTS=
+DISTCC_HOSTS+=" --localslots=$(nproc)"
+DISTCC_HOSTS+=" --localslots_cpp=$(nproc)"
+DISTCC_HOSTS+=" --randomize"
+DISTCC_HOSTS+=" ${BUILD_FARM}"
+
+# The compiler that distcc.sh should use: gcc or clang.
+: ${IMPALA_REAL_CXX_COMPILER=}
+export IMPALA_REAL_CXX_COMPILER
+
+# Set to false to use local compilation instead of distcc.
+: ${IMPALA_USE_DISTCC=}
+export IMPALA_USE_DISTCC
+
+# Even after generating make files, some state about compiler options would only exist in
+# environment vars. Any such vars should be saved to this file so they can be restored.
+if [[ -z "$IMPALA_HOME" ]]; then
+  echo '$IMPALA_HOME must be set before sourcing this file.' 1>&2
+  return 1
+fi
+IMPALA_COMPILER_CONFIG_FILE="$IMPALA_HOME/.impala_compiler_opts"
+
+# Completely disable anything that could have been setup using this script and clean
+# the make files.
+function disable_distcc {
+  export IMPALA_CXX_COMPILER=default
+  export IMPALA_BUILD_THREADS=$(nproc)
+  save_compiler_opts
+  if ! clean_cmake_files; then
+    echo Failed to clean cmake files. 1>&2
+    return 1
+  fi
+  echo "distcc is not fully disabled, run 'buildall.sh' to complete the change." \
+    "Run 'enable_distcc' to enable."
+}
+
+function enable_distcc {
+  export IMPALA_CXX_COMPILER="$DISTCC_ENV_DIR"/distcc.sh
+  switch_compiler distcc gcc
+  export IMPALA_BUILD_THREADS=$(distcc -j)
+  if ! clean_cmake_files; then
+    echo Failed to clean cmake files. 1>&2
+    return 1
+  fi
+  echo "distcc is not fully enabled, run 'buildall.sh' to complete the change." \
+    "Run 'disable_distcc' or 'switch_compiler local' to disable."
+}
+
+# Cleans old CMake files, this is required when switching between distcc.sh and direct
+# compilation.
+function clean_cmake_files {
+  if [[ -z "$IMPALA_HOME" || ! -d "$IMPALA_HOME" ]]; then
+    echo IMPALA_HOME=$IMPALA_HOME is not valid. 1>&2
+    return 1
+  fi
+  # Copied from $IMPALA_HOME/bin/clean.sh.
+  find "$IMPALA_HOME" -iname '*cmake*' -not -name CMakeLists.txt \
+      -not -path '*cmake_modules*' \
+      -not -path '*thirdparty*'  | xargs rm -rf
+}
+
+function switch_compiler {
+  for ARG in "$@"; do
+    case "$ARG" in
+      "local")
+        IMPALA_USE_DISTCC=false
+        IMPALA_BUILD_THREADS=$(nproc);;
+      distcc)
+        IMPALA_USE_DISTCC=true
+        IMPALA_BUILD_THREADS=$(distcc -j);;
+      gcc) IMPALA_REAL_CXX_COMPILER=gcc;;
+      clang) IMPALA_REAL_CXX_COMPILER=clang;;
+      *) echo "Valid compiler options are:
+    'local'  - Don't use distcc and set -j value to $(nproc). (gcc/clang) remains unchanged.
+    'distcc' - Use distcc and set -j value to $(distcc -j). (gcc/clang) remains unchanged.
+    'gcc'    - Use gcc. (local/distcc remains unchanged).
+    'clang'  - Use clang. (local/distcc remains unchanged)." 2>&1
+        return 1;;
+    esac
+  done
+  save_compiler_opts
+}
+
+function save_compiler_opts {
+  rm -f "$IMPALA_COMPILER_CONFIG_FILE"
+  cat <<EOF > "$IMPALA_COMPILER_CONFIG_FILE"
+IMPALA_CXX_COMPILER=$IMPALA_CXX_COMPILER
+IMPALA_BUILD_THREADS=$IMPALA_BUILD_THREADS
+IMPALA_USE_DISTCC=$IMPALA_USE_DISTCC
+IMPALA_REAL_CXX_COMPILER=$IMPALA_REAL_CXX_COMPILER
+EOF
+}
+
+if [[ -e "$IMPALA_COMPILER_CONFIG_FILE" ]]; then
+  source "$IMPALA_COMPILER_CONFIG_FILE"
+else
+  enable_distcc
+fi


[6/6] incubator-impala git commit: IMPALA-3884: Support TYPE_TIMESTAMP for HashTableCtx::CodegenAssignNullValue()

Posted by kw...@apache.org.
IMPALA-3884: Support TYPE_TIMESTAMP for HashTableCtx::CodegenAssignNullValue()

This change implements support for TYPE_TIMESTAMP for
HashTableCtx::CodegenAssignNullValue(). TimestampValue itself
is 16 bytes in size. To match RawValue::Write() in the
interpreted path, CodegenAssignNullValue() emits code to assign
HashUtil::FNV_SEED to both the upper and lower 64-bit of the
destination value. This change also fixes the handling of 128-bit
Decimal16Value in CodegenAssignNullValue() so the emitted code
matches the behavior of the interpreted path.

Change-Id: I0211d38cbef46331e0006fa5ed0680e6e0867bc8
Reviewed-on: http://gerrit.cloudera.org:8080/4794
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Michael Ho <kw...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/13455b5a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/13455b5a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/13455b5a

Branch: refs/heads/master
Commit: 13455b5a24a9d4d009d1dd0d72944c6cacd54829
Parents: aa7741a
Author: Michael Ho <kw...@cloudera.com>
Authored: Fri Oct 21 15:00:56 2016 -0700
Committer: Michael Ho <kw...@cloudera.com>
Committed: Tue Oct 25 05:52:33 2016 +0000

----------------------------------------------------------------------
 be/src/codegen/codegen-anyval.cc                |  8 ++---
 be/src/codegen/llvm-codegen.cc                  | 10 ++++--
 be/src/codegen/llvm-codegen.h                   |  9 +++--
 be/src/exec/hash-table.cc                       | 36 ++++++++++++--------
 be/src/exec/old-hash-table.cc                   | 32 +++++++++++------
 be/src/util/bit-util.h                          |  4 +++
 .../queries/QueryTest/joins.test                | 10 ++++++
 7 files changed, 72 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/13455b5a/be/src/codegen/codegen-anyval.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/codegen-anyval.cc b/be/src/codegen/codegen-anyval.cc
index a2df356..57c08b5 100644
--- a/be/src/codegen/codegen-anyval.cc
+++ b/be/src/codegen/codegen-anyval.cc
@@ -380,12 +380,8 @@ void CodegenAnyVal::SetVal(int64_t val) {
 
 void CodegenAnyVal::SetVal(int128_t val) {
   DCHECK_EQ(type_.type, TYPE_DECIMAL);
-  // TODO: is there a better way to do this?
-  // Set high bits
-  Value* ir_val = ConstantInt::get(codegen_->i128_type(), HighBits(val));
-  ir_val = builder_->CreateShl(ir_val, 64, "tmp");
-  // Set low bits
-  ir_val = builder_->CreateOr(ir_val, LowBits(val), "tmp");
+  vector<uint64_t> vals({LowBits(val), HighBits(val)});
+  Value* ir_val = ConstantInt::get(codegen_->context(), APInt(128, vals));
   SetVal(ir_val);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/13455b5a/be/src/codegen/llvm-codegen.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen.cc b/be/src/codegen/llvm-codegen.cc
index d43ad6e..30cc7d0 100644
--- a/be/src/codegen/llvm-codegen.cc
+++ b/be/src/codegen/llvm-codegen.cc
@@ -564,7 +564,7 @@ Value* LlvmCodeGen::CastPtrToLlvmPtr(Type* type, const void* ptr) {
   return ConstantExpr::getIntToPtr(const_int, type);
 }
 
-Value* LlvmCodeGen::GetIntConstant(PrimitiveType type, int64_t val) {
+Value* LlvmCodeGen::GetIntConstant(PrimitiveType type, uint64_t val) {
   switch (type) {
     case TYPE_TINYINT:
       return ConstantInt::get(context(), APInt(8, val));
@@ -580,8 +580,12 @@ Value* LlvmCodeGen::GetIntConstant(PrimitiveType type, int64_t val) {
   }
 }
 
-Value* LlvmCodeGen::GetIntConstant(int num_bytes, int64_t val) {
-  return ConstantInt::get(context(), APInt(8 * num_bytes, val));
+Value* LlvmCodeGen::GetIntConstant(int num_bytes, uint64_t low_bits, uint64_t high_bits) {
+  DCHECK_GE(num_bytes, 1);
+  DCHECK_LE(num_bytes, 16);
+  DCHECK(BitUtil::IsPowerOf2(num_bytes));
+  vector<uint64_t> vals({low_bits, high_bits});
+  return ConstantInt::get(context(), APInt(8 * num_bytes, vals));
 }
 
 AllocaInst* LlvmCodeGen::CreateEntryBlockAlloca(Function* f, const NamedVariable& var) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/13455b5a/be/src/codegen/llvm-codegen.h
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen.h b/be/src/codegen/llvm-codegen.h
index 0b81701..602dc5d 100644
--- a/be/src/codegen/llvm-codegen.h
+++ b/be/src/codegen/llvm-codegen.h
@@ -380,10 +380,13 @@ class LlvmCodeGen {
   llvm::Value* CastPtrToLlvmPtr(llvm::Type* type, const void* ptr);
 
   /// Returns the constant 'val' of 'type'.
-  llvm::Value* GetIntConstant(PrimitiveType type, int64_t val);
+  llvm::Value* GetIntConstant(PrimitiveType type, uint64_t val);
 
-  /// Returns the constant 'val' of the int type of size 'byte_size'.
-  llvm::Value* GetIntConstant(int byte_size, int64_t val);
+  /// Returns a constant int of 'byte_size' bytes based on 'low_bits' and 'high_bits'
+  /// which stand for the lower and upper 64-bits of the constant respectively. For
+  /// values less than or equal to 64-bits, 'high_bits' is not used. This function
+  /// can generate constant up to 128-bit wide. 'byte_size' must be power of 2.
+  llvm::Value* GetIntConstant(int byte_size, uint64_t low_bits, uint64_t high_bits);
 
   /// Returns true/false constants (bool type)
   llvm::Value* true_value() { return true_value_; }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/13455b5a/be/src/exec/hash-table.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc
index c39e9e9..74fc534 100644
--- a/be/src/exec/hash-table.cc
+++ b/be/src/exec/hash-table.cc
@@ -569,41 +569,51 @@ string HashTable::PrintStats() const {
 // we'll pick a more random value.
 static void CodegenAssignNullValue(LlvmCodeGen* codegen,
     LlvmCodeGen::LlvmBuilder* builder, Value* dst, const ColumnType& type) {
-  int64_t fvn_seed = HashUtil::FNV_SEED;
+  uint64_t fnv_seed = HashUtil::FNV_SEED;
 
   if (type.type == TYPE_STRING || type.type == TYPE_VARCHAR) {
     Value* dst_ptr = builder->CreateStructGEP(NULL, dst, 0, "string_ptr");
     Value* dst_len = builder->CreateStructGEP(NULL, dst, 1, "string_len");
-    Value* null_len = codegen->GetIntConstant(TYPE_INT, fvn_seed);
+    Value* null_len = codegen->GetIntConstant(TYPE_INT, fnv_seed);
     Value* null_ptr = builder->CreateIntToPtr(null_len, codegen->ptr_type());
     builder->CreateStore(null_ptr, dst_ptr);
     builder->CreateStore(null_len, dst_len);
   } else {
     Value* null_value = NULL;
-    // Get a type specific representation of fvn_seed
+    int byte_size = type.GetByteSize();
+    // Get a type specific representation of fnv_seed
     switch (type.type) {
       case TYPE_BOOLEAN:
         // In results, booleans are stored as 1 byte
         dst = builder->CreateBitCast(dst, codegen->ptr_type());
-        null_value = codegen->GetIntConstant(TYPE_TINYINT, fvn_seed);
+        null_value = codegen->GetIntConstant(TYPE_TINYINT, fnv_seed);
         break;
+      case TYPE_TIMESTAMP: {
+        // Cast 'dst' to 'i128*'
+        DCHECK_EQ(byte_size, 16);
+        PointerType* fnv_seed_ptr_type =
+            codegen->GetPtrType(Type::getIntNTy(codegen->context(), byte_size * 8));
+        dst = builder->CreateBitCast(dst, fnv_seed_ptr_type);
+        null_value = codegen->GetIntConstant(byte_size, fnv_seed, fnv_seed);
+        break;
+      }
       case TYPE_TINYINT:
       case TYPE_SMALLINT:
       case TYPE_INT:
       case TYPE_BIGINT:
       case TYPE_DECIMAL:
-        null_value = codegen->GetIntConstant(type.GetByteSize(), fvn_seed);
+        null_value = codegen->GetIntConstant(byte_size, fnv_seed, fnv_seed);
         break;
       case TYPE_FLOAT: {
         // Don't care about the value, just the bit pattern
-        float fvn_seed_float = *reinterpret_cast<float*>(&fvn_seed);
-        null_value = ConstantFP::get(codegen->context(), APFloat(fvn_seed_float));
+        float fnv_seed_float = *reinterpret_cast<float*>(&fnv_seed);
+        null_value = ConstantFP::get(codegen->context(), APFloat(fnv_seed_float));
         break;
       }
       case TYPE_DOUBLE: {
         // Don't care about the value, just the bit pattern
-        double fvn_seed_double = *reinterpret_cast<double*>(&fvn_seed);
-        null_value = ConstantFP::get(codegen->context(), APFloat(fvn_seed_double));
+        double fnv_seed_double = *reinterpret_cast<double*>(&fnv_seed);
+        null_value = ConstantFP::get(codegen->context(), APFloat(fnv_seed_double));
         break;
       }
       default:
@@ -685,13 +695,11 @@ static void CodegenAssignNullValue(LlvmCodeGen* codegen,
 // becomes the start of the next block for codegen (either the next expr or just the
 // end of the function).
 Status HashTableCtx::CodegenEvalRow(LlvmCodeGen* codegen, bool build, Function** fn) {
-  // TODO: CodegenAssignNullValue() can't handle TYPE_TIMESTAMP or TYPE_DECIMAL yet
   const vector<ExprContext*>& ctxs = build ? build_expr_ctxs_ : probe_expr_ctxs_;
   for (int i = 0; i < ctxs.size(); ++i) {
-    PrimitiveType type = ctxs[i]->root()->type().type;
-    if (type == TYPE_TIMESTAMP || type == TYPE_CHAR) {
-      return Status(Substitute("HashTableCtx::CodegenEvalRow(): type $0 NYI",
-          TypeToString(type)));
+    // Disable codegen for CHAR
+    if (ctxs[i]->root()->type().type == TYPE_CHAR) {
+      return Status("HashTableCtx::CodegenEvalRow(): CHAR NYI");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/13455b5a/be/src/exec/old-hash-table.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/old-hash-table.cc b/be/src/exec/old-hash-table.cc
index db89782..f51bc74 100644
--- a/be/src/exec/old-hash-table.cc
+++ b/be/src/exec/old-hash-table.cc
@@ -184,41 +184,52 @@ int OldHashTable::AddBloomFilters() {
 // we'll pick a more random value.
 static void CodegenAssignNullValue(LlvmCodeGen* codegen,
     LlvmCodeGen::LlvmBuilder* builder, Value* dst, const ColumnType& type) {
-  int64_t fvn_seed = HashUtil::FNV_SEED;
+  uint64_t fnv_seed = HashUtil::FNV_SEED;
 
   if (type.type == TYPE_STRING || type.type == TYPE_VARCHAR) {
     Value* dst_ptr = builder->CreateStructGEP(NULL, dst, 0, "string_ptr");
     Value* dst_len = builder->CreateStructGEP(NULL, dst, 1, "string_len");
-    Value* null_len = codegen->GetIntConstant(TYPE_INT, fvn_seed);
+    Value* null_len = codegen->GetIntConstant(TYPE_INT, fnv_seed);
     Value* null_ptr = builder->CreateIntToPtr(null_len, codegen->ptr_type());
     builder->CreateStore(null_ptr, dst_ptr);
     builder->CreateStore(null_len, dst_len);
     return;
   } else {
     Value* null_value = NULL;
-    // Get a type specific representation of fvn_seed
+    int byte_size = type.GetByteSize();
+    // Get a type specific representation of fnv_seed
     switch (type.type) {
       case TYPE_BOOLEAN:
         // In results, booleans are stored as 1 byte
         dst = builder->CreateBitCast(dst, codegen->ptr_type());
-        null_value = codegen->GetIntConstant(TYPE_TINYINT, fvn_seed);
+        null_value = codegen->GetIntConstant(TYPE_TINYINT, fnv_seed);
         break;
+      case TYPE_TIMESTAMP: {
+        // Cast 'dst' to 'i128*'
+        DCHECK_EQ(byte_size, 16);
+        PointerType* fnv_seed_ptr_type =
+            codegen->GetPtrType(Type::getIntNTy(codegen->context(), byte_size * 8));
+        dst = builder->CreateBitCast(dst, fnv_seed_ptr_type);
+        null_value = codegen->GetIntConstant(byte_size, fnv_seed, fnv_seed);
+        break;
+      }
       case TYPE_TINYINT:
       case TYPE_SMALLINT:
       case TYPE_INT:
       case TYPE_BIGINT:
-        null_value = codegen->GetIntConstant(type.type, fvn_seed);
+      case TYPE_DECIMAL:
+        null_value = codegen->GetIntConstant(byte_size, fnv_seed, fnv_seed);
         break;
       case TYPE_FLOAT: {
         // Don't care about the value, just the bit pattern
-        float fvn_seed_float = *reinterpret_cast<float*>(&fvn_seed);
-        null_value = ConstantFP::get(codegen->context(), APFloat(fvn_seed_float));
+        float fnv_seed_float = *reinterpret_cast<float*>(&fnv_seed);
+        null_value = ConstantFP::get(codegen->context(), APFloat(fnv_seed_float));
         break;
       }
       case TYPE_DOUBLE: {
         // Don't care about the value, just the bit pattern
-        double fvn_seed_double = *reinterpret_cast<double*>(&fvn_seed);
-        null_value = ConstantFP::get(codegen->context(), APFloat(fvn_seed_double));
+        double fnv_seed_double = *reinterpret_cast<double*>(&fnv_seed);
+        null_value = ConstantFP::get(codegen->context(), APFloat(fnv_seed_double));
         break;
       }
       default:
@@ -256,11 +267,10 @@ static void CodegenAssignNullValue(LlvmCodeGen* codegen,
 // becomes the start of the next block for codegen (either the next expr or just the
 // end of the function).
 Function* OldHashTable::CodegenEvalTupleRow(LlvmCodeGen* codegen, bool build) {
-  // TODO: CodegenAssignNullValue() can't handle TYPE_TIMESTAMP or TYPE_DECIMAL yet
   const vector<ExprContext*>& ctxs = build ? build_expr_ctxs_ : probe_expr_ctxs_;
   for (int i = 0; i < ctxs.size(); ++i) {
     PrimitiveType type = ctxs[i]->root()->type().type;
-    if (type == TYPE_TIMESTAMP || type == TYPE_DECIMAL || type == TYPE_CHAR) return NULL;
+    if (type == TYPE_CHAR) return NULL;
   }
 
   // Get types to generate function prototype

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/13455b5a/be/src/util/bit-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-util.h b/be/src/util/bit-util.h
index 33dd02b..9a5c4a4 100644
--- a/be/src/util/bit-util.h
+++ b/be/src/util/bit-util.h
@@ -82,6 +82,10 @@ class BitUtil {
     return value & ~(factor - 1);
   }
 
+  constexpr static inline bool IsPowerOf2(int64_t value) {
+    return (value & (value - 1)) == 0;
+  }
+
   /// Specialized round up and down functions for frequently used factors,
   /// like 8 (bits->bytes), 32 (bits->i32), and 64 (bits->i64).
   /// Returns the rounded up number of bytes that fit the number of bits.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/13455b5a/testdata/workloads/functional-query/queries/QueryTest/joins.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/joins.test b/testdata/workloads/functional-query/queries/QueryTest/joins.test
index ebb6287..db915df 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/joins.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/joins.test
@@ -720,3 +720,13 @@ on extract(minute from t1.timestamp_col) = extract(hour from t2.timestamp_col);
 ---- TYPES
 BIGINT
 ====
+---- QUERY
+# Regression for IMPALA-3884. Exercise HashTableCtx::AssignNullValue() for
+# 128-bit TimestampValue.
+select count(*) from functional.alltypes t1 right outer join functional.decimal_tbl t2 on
+t1.timestamp_col = cast(t2.d4 as TIMESTAMP);
+---- RESULTS
+5
+---- TYPES
+BIGINT
+====


[5/6] incubator-impala git commit: IMPALA-3211: provide toolchain build id for bootstrapping

Posted by kw...@apache.org.
IMPALA-3211: provide toolchain build id for bootstrapping

Testing:
Ran a private build, which succeeded.

Change-Id: Ibcc25ae82511713d0ff05ded37ef162925f2f0fb
Reviewed-on: http://gerrit.cloudera.org:8080/4771
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/aa7741a5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/aa7741a5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/aa7741a5

Branch: refs/heads/master
Commit: aa7741a57bbd3c11377e29c59e1d734bcdebf602
Parents: a625701
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Oct 19 15:44:59 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Tue Oct 25 05:10:28 2016 +0000

----------------------------------------------------------------------
 bin/bootstrap_toolchain.py | 9 +++++++--
 bin/impala-config.sh       | 7 +++++++
 2 files changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/aa7741a5/bin/bootstrap_toolchain.py
----------------------------------------------------------------------
diff --git a/bin/bootstrap_toolchain.py b/bin/bootstrap_toolchain.py
index 3f161c8..dbe53a2 100755
--- a/bin/bootstrap_toolchain.py
+++ b/bin/bootstrap_toolchain.py
@@ -86,9 +86,14 @@ def wget_and_unpack_package(download_path, file_name, destination, wget_no_clobb
 def download_package(destination, product, version, compiler, platform_release=None):
   remove_existing_package(destination, product, version)
 
+  toolchain_build_id = os.environ["IMPALA_TOOLCHAIN_BUILD_ID"]
   label = get_platform_release_label(release=platform_release)
-  file_name = "{0}-{1}-{2}-{3}.tar.gz".format(product, version, compiler, label)
-  url_path="/{0}/{1}-{2}/{0}-{1}-{2}-{3}.tar.gz".format(product, version, compiler, label)
+  format_params = {'product': product, 'version': version, 'compiler': compiler,
+      'label': label, 'toolchain_build_id': toolchain_build_id}
+  file_name = "{product}-{version}-{compiler}-{label}.tar.gz".format(**format_params)
+  format_params['file_name'] = file_name
+  url_path = "/{toolchain_build_id}/{product}/{version}-{compiler}/{file_name}".format(
+      **format_params)
   download_path = HOST + url_path
 
   wget_and_unpack_package(download_path, file_name, destination, True)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/aa7741a5/bin/impala-config.sh
----------------------------------------------------------------------
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index 85bcbfc..4e848c1 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -52,6 +52,12 @@ fi
 # If true, will not call $IMPALA_HOME/bin/bootstrap_toolchain.py.
 : ${SKIP_TOOLCHAIN_BOOTSTRAP=false}
 
+# The unique build id of the toolchain to use if bootstrapping. This is generated by the
+# native-toolchain build when publishing its build artifacts. This should be changed when
+# moving to a different build of the toolchain, e.g. when a version is bumped or a
+# compile option is changed.
+: ${IMPALA_TOOLCHAIN_BUILD_ID=249-2267164200}
+
 # This flag is used in $IMPALA_HOME/cmake_modules/toolchain.cmake.
 # If it's 0, Impala will be built with the compiler in the toolchain directory.
 : ${USE_SYSTEM_GCC=0}
@@ -79,6 +85,7 @@ fi
 
 export IMPALA_TOOLCHAIN
 export SKIP_TOOLCHAIN_BOOTSTRAP
+export IMPALA_TOOLCHAIN_BUILD_ID
 export USE_SYSTEM_GCC
 export USE_GOLD_LINKER
 export IMPALA_CXX_COMPILER


[3/6] incubator-impala git commit: IMPALA-4134, IMPALA-3704: Kudu INSERT improvements

Posted by kw...@apache.org.
IMPALA-4134,IMPALA-3704: Kudu INSERT improvements

1.) IMPALA-4134: Use Kudu AUTO FLUSH
Improves performance of writes to Kudu up to 4.2x in
bulk data loading tests (load 200 million rows from
lineitem).

2.) IMPALA-3704: Improve errors on PK conflicts
The Kudu client reports an error for every PK conflict,
and all errors were being returned in the error status.
As a result, inserts/updates/deletes could return errors
with thousands errors reported. This changes the error
handling to log all reported errors as warnings and
return only the first error in the query error status.

3.) Improve the DataSink reporting of the insert stats.
The per-partition stats returned by the data sink weren't
useful for Kudu sinks. Firstly, the number of appended rows
was not being displayed in the profile. Secondly, the
'stats' field isn't populated for Kudu tables and thus was
confusing in the profile, so it is no longer printed if it
is not set in the thrift struct.

Testing: Ran local tests, including new tests to verify
the query profile insert stats. Manual cluster testing was
conducted of the AUTO FLUSH functionality, and that testing
informed the default mutation buffer value of 100MB which
was found to provide good results.

Change-Id: I5542b9a061b01c543a139e8722560b1365f06595
Reviewed-on: http://gerrit.cloudera.org:8080/4728
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/99ed6dc6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/99ed6dc6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/99ed6dc6

Branch: refs/heads/master
Commit: 99ed6dc67ae889eb2a45b10c97cb23f52bc83e5d
Parents: 0eaff80
Author: Matthew Jacobs <mj...@cloudera.com>
Authored: Wed Oct 19 15:30:58 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Tue Oct 25 02:06:10 2016 +0000

----------------------------------------------------------------------
 be/src/exec/data-sink.cc                        |   5 +
 be/src/exec/hbase-table-sink.cc                 |   4 +-
 be/src/exec/hdfs-table-sink.cc                  |   4 +-
 be/src/exec/kudu-table-sink.cc                  | 141 ++++++++++++-------
 be/src/exec/kudu-table-sink.h                   |  53 ++++---
 be/src/runtime/coordinator.cc                   |  14 +-
 be/src/service/impala-beeswax-server.cc         |   4 +-
 be/src/service/query-exec-state.cc              |   2 +-
 common/thrift/ImpalaInternalService.thrift      |   4 +-
 common/thrift/ImpalaService.thrift              |   8 +-
 common/thrift/generate_error_codes.py           |   6 +
 shell/impala_client.py                          |   2 +-
 .../queries/QueryTest/kudu_crud.test            |  94 ++++++++++++-
 tests/beeswax/impala_beeswax.py                 |   4 +-
 14 files changed, 248 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/99ed6dc6/be/src/exec/data-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index c95b854..6a34543 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -142,6 +142,11 @@ string DataSink::OutputInsertStats(const PartitionStatusMap& stats,
     } else {
       ss << partition_key << endl;
     }
+    if (val.second.__isset.num_modified_rows) {
+      ss << "NumModifiedRows: " << val.second.num_modified_rows << endl;
+    }
+
+    if (!val.second.__isset.stats) continue;
     const TInsertStats& stats = val.second.stats;
     ss << indent << "BytesWritten: "
        << PrettyPrinter::Print(stats.bytes_written, TUnit::BYTES);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/99ed6dc6/be/src/exec/hbase-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-table-sink.cc b/be/src/exec/hbase-table-sink.cc
index e6052cc..3d84fed 100644
--- a/be/src/exec/hbase-table-sink.cc
+++ b/be/src/exec/hbase-table-sink.cc
@@ -72,7 +72,7 @@ Status HBaseTableSink::Prepare(RuntimeState* state, MemTracker* mem_tracker) {
 
   // Add a 'root partition' status in which to collect insert statistics
   TInsertPartitionStatus root_status;
-  root_status.__set_num_appended_rows(0L);
+  root_status.__set_num_modified_rows(0L);
   root_status.__set_stats(TInsertStats());
   root_status.__set_id(-1L);
   state->per_partition_status()->insert(make_pair(ROOT_PARTITION_KEY, root_status));
@@ -90,7 +90,7 @@ Status HBaseTableSink::Send(RuntimeState* state, RowBatch* batch) {
   RETURN_IF_ERROR(state->CheckQueryState());
   // Since everything is set up just forward everything to the writer.
   RETURN_IF_ERROR(hbase_table_writer_->AppendRowBatch(batch));
-  (*state->per_partition_status())[ROOT_PARTITION_KEY].num_appended_rows +=
+  (*state->per_partition_status())[ROOT_PARTITION_KEY].num_modified_rows +=
       batch->num_rows();
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/99ed6dc6/be/src/exec/hdfs-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index 77316d4..63bd648 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -499,7 +499,7 @@ inline Status HdfsTableSink::GetOutputPartition(RuntimeState* state,
     DCHECK(state->per_partition_status()->find(partition->partition_name) ==
         state->per_partition_status()->end());
     TInsertPartitionStatus partition_status;
-    partition_status.__set_num_appended_rows(0L);
+    partition_status.__set_num_modified_rows(0L);
     partition_status.__set_id(partition_descriptor->id());
     partition_status.__set_stats(TInsertStats());
     partition_status.__set_partition_base_dir(table_desc_->hdfs_base_dir());
@@ -594,7 +594,7 @@ Status HdfsTableSink::FinalizePartitionFile(RuntimeState* state,
     // Should have been created in GetOutputPartition() when the partition was
     // initialised.
     DCHECK(it != state->per_partition_status()->end());
-    it->second.num_appended_rows += partition->num_rows;
+    it->second.num_modified_rows += partition->num_rows;
     DataSink::MergeInsertStats(partition->writer->stats(), &it->second.stats);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/99ed6dc6/be/src/exec/kudu-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index 661489f..70a74a9 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -33,6 +33,8 @@
 
 DEFINE_int32(kudu_session_timeout_seconds, 60, "Timeout set on the Kudu session. "
     "How long to wait before considering a write failed.");
+DEFINE_int32(kudu_mutation_buffer_size, 100 * 1024 * 1024, "The size (bytes) of the "
+    "Kudu client buffer for mutations.");
 
 using kudu::client::KuduColumnSchema;
 using kudu::client::KuduSchema;
@@ -48,6 +50,9 @@ namespace impala {
 const static string& ROOT_PARTITION_KEY =
     g_ImpalaInternalService_constants.ROOT_PARTITION_KEY;
 
+// Send 7MB buffers to Kudu, matching a hard-coded size in Kudu (KUDU-1693).
+const static int INDIVIDUAL_BUFFER_SIZE = 7 * 1024 * 1024;
+
 KuduTableSink::KuduTableSink(const RowDescriptor& row_desc,
     const vector<TExpr>& select_list_texprs,
     const TDataSink& tsink)
@@ -56,8 +61,6 @@ KuduTableSink::KuduTableSink(const RowDescriptor& row_desc,
       select_list_texprs_(select_list_texprs),
       sink_action_(tsink.table_sink.action),
       kudu_table_sink_(tsink.table_sink.kudu_table_sink),
-      kudu_flush_counter_(NULL),
-      kudu_flush_timer_(NULL),
       kudu_error_counter_(NULL),
       rows_written_(NULL),
       rows_written_rate_(NULL) {
@@ -91,16 +94,14 @@ Status KuduTableSink::Prepare(RuntimeState* state, MemTracker* mem_tracker) {
 
   // Add a 'root partition' status in which to collect write statistics
   TInsertPartitionStatus root_status;
-  root_status.__set_num_appended_rows(0L);
-  root_status.__set_stats(TInsertStats());
+  root_status.__set_num_modified_rows(0L);
   root_status.__set_id(-1L);
   state->per_partition_status()->insert(make_pair(ROOT_PARTITION_KEY, root_status));
 
   // Add counters
-  kudu_flush_counter_ = ADD_COUNTER(profile(), "TotalKuduFlushOperations", TUnit::UNIT);
   kudu_error_counter_ = ADD_COUNTER(profile(), "TotalKuduFlushErrors", TUnit::UNIT);
-  kudu_flush_timer_ = ADD_TIMER(profile(), "KuduFlushTimer");
   rows_written_ = ADD_COUNTER(profile(), "RowsWritten", TUnit::UNIT);
+  kudu_apply_timer_ = ADD_TIMER(profile(), "KuduApplyTimer");
   rows_written_rate_ = profile()->AddDerivedCounter(
       "RowsWrittenRate", TUnit::UNIT_PER_SECOND,
       bind<int64_t>(&RuntimeProfile::UnitsPerSecond, rows_written_,
@@ -124,8 +125,40 @@ Status KuduTableSink::Open(RuntimeState* state) {
 
   session_ = client_->NewSession();
   session_->SetTimeoutMillis(FLAGS_kudu_session_timeout_seconds * 1000);
+
+  // KuduSession Set* methods here and below return a status for API compatibility.
+  // As long as the Kudu client is statically linked, these shouldn't fail and thus these
+  // calls could also DCHECK status is OK for debug builds (while still returning errors
+  // for release).
   KUDU_RETURN_IF_ERROR(session_->SetFlushMode(
-      kudu::client::KuduSession::MANUAL_FLUSH), "Unable to set flush mode");
+      kudu::client::KuduSession::AUTO_FLUSH_BACKGROUND), "Unable to set flush mode");
+
+  const int32_t buf_size = FLAGS_kudu_mutation_buffer_size;
+  if (buf_size < 1024 * 1024) {
+    return Status(strings::Substitute(
+        "Invalid kudu_mutation_buffer_size: '$0'. Must be greater than 1MB.", buf_size));
+  }
+  KUDU_RETURN_IF_ERROR(session_->SetMutationBufferSpace(buf_size),
+      "Couldn't set mutation buffer size");
+
+  // Configure client memory used for buffering.
+  // Internally, the Kudu client keeps one or more buffers for writing operations. When a
+  // single buffer is flushed, it is locked (that space cannot be reused) until all
+  // operations within it complete, so it is important to have a number of buffers. In
+  // our testing, we found that allowing a total of 100MB of buffer space to provide good
+  // results; this is the default.  Then, because of some existing 8MB limits in Kudu, we
+  // want to have that total space broken up into 7MB buffers (INDIVIDUAL_BUFFER_SIZE).
+  // The mutation flush watermark is set to flush every INDIVIDUAL_BUFFER_SIZE.
+  int num_buffers = FLAGS_kudu_mutation_buffer_size / INDIVIDUAL_BUFFER_SIZE;
+  if (num_buffers == 0) num_buffers = 1;
+  KUDU_RETURN_IF_ERROR(session_->SetMutationBufferFlushWatermark(1.0 / num_buffers),
+      "Couldn't set mutation buffer watermark");
+
+  // No limit on the buffer count since the settings above imply a max number of buffers.
+  // Note that the Kudu client API has a few too many knobs for configuring the size and
+  // number of these buffers; there are a few ways to accomplish similar behaviors.
+  KUDU_RETURN_IF_ERROR(session_->SetMutationBufferMaxNum(0),
+      "Couldn't set mutation buffer count");
   return Status::OK();
 }
 
@@ -135,7 +168,8 @@ kudu::client::KuduWriteOperation* KuduTableSink::NewWriteOp() {
   } else if (sink_action_ == TSinkAction::UPDATE) {
     return table_->NewUpdate();
   } else {
-    DCHECK(sink_action_ == TSinkAction::DELETE) << "Sink type not supported. " << sink_action_;
+    DCHECK(sink_action_ == TSinkAction::DELETE) << "Sink type not supported: "
+        << sink_action_;
     return table_->NewDelete();
   }
 }
@@ -145,11 +179,15 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) {
   ExprContext::FreeLocalAllocations(output_expr_ctxs_);
   RETURN_IF_ERROR(state->CheckQueryState());
 
+  // Collect all write operations and apply them together so the time in Apply() can be
+  // easily timed.
+  vector<unique_ptr<kudu::client::KuduWriteOperation>> write_ops;
+
   int rows_added = 0;
   // Since everything is set up just forward everything to the writer.
   for (int i = 0; i < batch->num_rows(); ++i) {
     TupleRow* current_row = batch->GetRow(i);
-    gscoped_ptr<kudu::client::KuduWriteOperation> write(NewWriteOp());
+    unique_ptr<kudu::client::KuduWriteOperation> write(NewWriteOp());
 
     for (int j = 0; j < output_expr_ctxs_.size(); ++j) {
       int col = kudu_table_sink_.referenced_columns.empty() ?
@@ -173,7 +211,7 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) {
         case TYPE_STRING: {
           StringValue* sv = reinterpret_cast<StringValue*>(value);
           kudu::Slice slice(reinterpret_cast<uint8_t*>(sv->ptr), sv->len);
-          KUDU_RETURN_IF_ERROR(write->mutable_row()->SetStringNoCopy(col, slice),
+          KUDU_RETURN_IF_ERROR(write->mutable_row()->SetString(col, slice),
               "Could not add Kudu WriteOp.");
           break;
         }
@@ -216,46 +254,38 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) {
           return Status(TErrorCode::IMPALA_KUDU_TYPE_MISSING, TypeToString(type));
       }
     }
+    write_ops.push_back(move(write));
+  }
 
-    KUDU_RETURN_IF_ERROR(session_->Apply(write.release()),
-        "Error while applying Kudu session.");
-    ++rows_added;
+  {
+    SCOPED_TIMER(kudu_apply_timer_);
+    for (auto&& write: write_ops) {
+      KUDU_RETURN_IF_ERROR(session_->Apply(write.release()), "Error applying Kudu Op.");
+      ++rows_added;
+    }
   }
+
   COUNTER_ADD(rows_written_, rows_added);
-  int64_t error_count = 0;
-  RETURN_IF_ERROR(Flush(&error_count));
-  (*state->per_partition_status())[ROOT_PARTITION_KEY].num_appended_rows +=
-      rows_added - error_count;
+  RETURN_IF_ERROR(CheckForErrors(state));
   return Status::OK();
 }
 
-Status KuduTableSink::Flush(int64_t* error_count) {
-  // TODO right now we always flush an entire row batch, if these are small we'll
-  // be inefficient. Consider decoupling impala's batch size from kudu's
-  kudu::Status s;
-  {
-    SCOPED_TIMER(kudu_flush_timer_);
-    COUNTER_ADD(kudu_flush_counter_, 1);
-    s = session_->Flush();
-  }
-  if (LIKELY(s.ok())) return Status::OK();
+Status KuduTableSink::CheckForErrors(RuntimeState* state) {
+  if (session_->CountPendingErrors() == 0) return Status::OK();
 
-  stringstream error_msg_buffer;
   vector<KuduError*> errors;
-
-  // Check if there are pending errors in the Kudu session. If errors overflowed the error
-  // buffer we can't be sure all errors can be ignored and fail immediately, setting 'failed'
-  // to true.
-  bool failed = false;
-  session_->GetPendingErrors(&errors, &failed);
-  if (UNLIKELY(failed)) {
-    error_msg_buffer << "Error overflow in Kudu session, "
-                     << "previous write operation might be inconsistent.\n";
+  Status status = Status::OK();
+
+  // Get the pending errors from the Kudu session. If errors overflowed the error buffer
+  // we can't be sure all errors can be ignored, so an error status will be reported.
+  bool error_overflow = false;
+  session_->GetPendingErrors(&errors, &error_overflow);
+  if (UNLIKELY(error_overflow)) {
+    status = Status("Error overflow in Kudu session.");
   }
 
   // The memory for the errors is manually managed. Iterate over all errors and delete
   // them accordingly.
-  bool first_error = true;
   for (int i = 0; i < errors.size(); ++i) {
     kudu::Status e = errors[i]->status();
     // If the sink has the option "ignore_not_found_or_duplicate" set, duplicate key or
@@ -265,24 +295,39 @@ Status KuduTableSink::Flush(int64_t* error_count) {
         ((sink_action_ == TSinkAction::DELETE && !e.IsNotFound()) ||
             (sink_action_ == TSinkAction::UPDATE && !e.IsNotFound()) ||
             (sink_action_ == TSinkAction::INSERT && !e.IsAlreadyPresent()))) {
-      if (first_error) {
-        error_msg_buffer << "Error while flushing Kudu session: \n";
-        first_error = false;
+      if (status.ok()) {
+        status = Status(strings::Substitute(
+            "Kudu error(s) reported, first error: $0", e.ToString()));
       }
-      error_msg_buffer << e.ToString() << "\n";
-      failed = true;
+    }
+    if (e.IsNotFound()) {
+      state->LogError(ErrorMsg::Init(TErrorCode::KUDU_KEY_NOT_FOUND,
+          table_desc_->table_name()));
+    } else if (e.IsAlreadyPresent()) {
+      state->LogError(ErrorMsg::Init(TErrorCode::KUDU_KEY_ALREADY_PRESENT,
+          table_desc_->table_name()));
+    } else {
+      state->LogError(ErrorMsg::Init(TErrorCode::KUDU_SESSION_ERROR,
+          table_desc_->table_name(), e.ToString()));
     }
     delete errors[i];
   }
   COUNTER_ADD(kudu_error_counter_, errors.size());
-  if (error_count != NULL) *error_count = errors.size();
-  if (failed) return Status(error_msg_buffer.str());
-  return Status::OK();
+  return status;
 }
 
 Status KuduTableSink::FlushFinal(RuntimeState* state) {
-  // No buffered state to flush.
-  return Status::OK();
+  kudu::Status flush_status = session_->Flush();
+
+  // Flush() may return an error status but any errors will also be reported by
+  // CheckForErrors(), so it's safe to ignore and always call CheckForErrors.
+  if (!flush_status.ok()) {
+    VLOG_RPC << "Ignoring Flush() error status: " << flush_status.ToString();
+  }
+  Status status = CheckForErrors(state);
+  (*state->per_partition_status())[ROOT_PARTITION_KEY].__set_num_modified_rows(
+      rows_written_->value() - kudu_error_counter_->value());
+  return status;
 }
 
 void KuduTableSink::Close(RuntimeState* state) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/99ed6dc6/be/src/exec/kudu-table-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.h b/be/src/exec/kudu-table-sink.h
index 2eeb721..fe278c5 100644
--- a/be/src/exec/kudu-table-sink.h
+++ b/be/src/exec/kudu-table-sink.h
@@ -30,17 +30,27 @@
 
 namespace impala {
 
-/// Sink that takes RowBatches and writes them into Kudu.
-/// Currently the data is sent to Kudu on Send(), i.e. the data is batched on the
-/// KuduSession until all the rows in a RowBatch are applied and then the session
-/// is flushed.
+/// Sink that takes RowBatches and writes them into a Kudu table.
 ///
-/// Kudu doesn't have transactions (yet!) so some rows may fail to write while
-/// others are successful. This sink will return an error if any of the rows fails
-/// to be written.
+/// The data is added to Kudu in Send(). The Kudu client is configured to automatically
+/// flush records when enough data has been written (AUTO_FLUSH_BACKGROUND). This
+/// requires specifying a mutation buffer size and a buffer flush watermark percentage in
+/// the Kudu client. The mutation buffer needs to be large enough to buffer rows sent to
+/// all destination nodes because the buffer accounting is not specified per-tablet
+/// server (KUDU-1693). Tests showed that 100MB was a good default, and this is
+/// configurable via the gflag --kudu_mutation_buffer_size. The buffer flush watermark
+/// percentage is set to a value that results in Kudu flushing after 7MB is in a
+/// buffer for a particular destination (of the 100MB of the total mutation buffer space)
+/// because Kudu currently has some 8MB buffer limits.
 ///
-/// TODO Once Kudu actually implements AUTOFLUSH_BACKGROUND flush mode we should
-/// change the flushing behavior as it will likely make writes more efficient.
+/// Kudu doesn't have transactions yet, so some rows may fail to write while others are
+/// successful. The Kudu client reports errors, some of which may be considered to be
+/// expected: rows that fail to be written/updated/deleted due to a key conflict while
+/// the IGNORE option is specified, and these will not result in the sink returning an
+/// error. These errors when IGNORE is not specified, or any other kind of error
+/// reported by Kudu result in the sink returning an error status. The first non-ignored
+/// error is returned in the sink's Status. All reported errors (ignored or not) will be
+/// logged via the RuntimeState.
 class KuduTableSink : public DataSink {
  public:
   KuduTableSink(const RowDescriptor& row_desc,
@@ -59,7 +69,7 @@ class KuduTableSink : public DataSink {
   /// The KuduSession is flushed on each row batch.
   virtual Status Send(RuntimeState* state, RowBatch* batch);
 
-  /// Does nothing. We currently flush on each Send() call.
+  /// Forces any remaining buffered operations to be flushed to Kudu.
   virtual Status FlushFinal(RuntimeState* state);
 
   /// Closes the KuduSession and the expressions.
@@ -72,12 +82,11 @@ class KuduTableSink : public DataSink {
   /// Create a new write operation according to the sink type.
   kudu::client::KuduWriteOperation* NewWriteOp();
 
-  /// Flushes the Kudu session, making sure all previous operations were committed, and handles
-  /// errors returned from Kudu. Passes the number of errors during the flush operations as an
-  /// out parameter.
-  /// Returns a non-OK status if there was an unrecoverable error. This might return an OK
-  /// status even if 'error_count' is > 0, as some errors might be ignored.
-  Status Flush(int64_t* error_count);
+  /// Checks for any errors buffered in the Kudu session, and increments
+  /// appropriate counters for ignored errors.
+  //
+  /// Returns a bad Status if there are non-ignorable errors.
+  Status CheckForErrors(RuntimeState* state);
 
   /// Used to get the KuduTableDescriptor from the RuntimeState
   TableId table_id_;
@@ -102,15 +111,15 @@ class KuduTableSink : public DataSink {
   /// Captures parameters passed down from the frontend
   TKuduTableSink kudu_table_sink_;
 
-  /// Counts the number of calls to KuduSession::flush().
-  RuntimeProfile::Counter* kudu_flush_counter_;
-
-  /// Aggregates the times spent in KuduSession:flush().
-  RuntimeProfile::Counter* kudu_flush_timer_;
-
   /// Total number of errors returned from Kudu.
   RuntimeProfile::Counter* kudu_error_counter_;
 
+  /// Time spent applying Kudu operations. In normal circumstances, Apply() should be
+  /// negligible because it is asynchronous with AUTO_FLUSH_BACKGROUND enabled.
+  /// Significant time spent in Apply() may indicate that Kudu cannot buffer and send
+  /// rows as fast as the sink can write them.
+  RuntimeProfile::Counter* kudu_apply_timer_;
+
   /// Total number of rows written including errors.
   RuntimeProfile::Counter* rows_written_;
   RuntimeProfile::Counter* rows_written_rate_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/99ed6dc6/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 4214d4d..0f41deb 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -1668,11 +1668,15 @@ Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& para
     for (const PartitionStatusMap::value_type& partition:
          params.insert_exec_status.per_partition_status) {
       TInsertPartitionStatus* status = &(per_partition_status_[partition.first]);
-      status->num_appended_rows += partition.second.num_appended_rows;
-      status->id = partition.second.id;
-      status->partition_base_dir = partition.second.partition_base_dir;
-      if (!status->__isset.stats) status->__set_stats(TInsertStats());
-      DataSink::MergeInsertStats(partition.second.stats, &status->stats);
+      status->__set_num_modified_rows(
+          status->num_modified_rows + partition.second.num_modified_rows);
+      status->__set_id(partition.second.id);
+      status->__set_partition_base_dir(partition.second.partition_base_dir);
+
+      if (partition.second.__isset.stats) {
+        if (!status->__isset.stats) status->__set_stats(TInsertStats());
+        DataSink::MergeInsertStats(partition.second.stats, &status->stats);
+      }
     }
     files_to_move_.insert(
         params.insert_exec_status.files_to_move.begin(),

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/99ed6dc6/be/src/service/impala-beeswax-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index b50499e..63fa9cd 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -523,8 +523,8 @@ Status ImpalaServer::CloseInsertInternal(const TUniqueId& query_id,
         for (const PartitionStatusMap::value_type& v:
              exec_state->coord()->per_partition_status()) {
           const pair<string, TInsertPartitionStatus> partition_status = v;
-          insert_result->rows_appended[partition_status.first] =
-              partition_status.second.num_appended_rows;
+          insert_result->rows_modified[partition_status.first] =
+              partition_status.second.num_modified_rows;
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/99ed6dc6/be/src/service/query-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc
index 69473c5..3dc9d6c 100644
--- a/be/src/service/query-exec-state.cc
+++ b/be/src/service/query-exec-state.cc
@@ -956,7 +956,7 @@ void ImpalaServer::QueryExecState::SetCreateTableAsSelectResultSet() {
   if (catalog_op_executor_->ddl_exec_response()->new_table_created) {
     DCHECK(coord_.get());
     for (const PartitionStatusMap::value_type& p: coord_->per_partition_status()) {
-      total_num_rows_inserted += p.second.num_appended_rows;
+      total_num_rows_inserted += p.second.num_modified_rows;
     }
   }
   const string& summary_msg = Substitute("Inserted $0 row(s)", total_num_rows_inserted);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/99ed6dc6/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 736de34..e9c3119 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -422,8 +422,8 @@ struct TInsertPartitionStatus {
   // query). See THdfsTable.partitions.
   1: optional i64 id
 
-  // The number of rows appended to this partition
-  2: optional i64 num_appended_rows
+  // The number of rows modified in this partition
+  2: optional i64 num_modified_rows
 
   // Detailed statistics gathered by table writers for this partition
   3: optional TInsertStats stats

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/99ed6dc6/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 794c140..573709b 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -247,10 +247,10 @@ enum TImpalaQueryOptions {
 
 // The summary of an insert.
 struct TInsertResult {
-  // Number of appended rows per modified partition. Only applies to HDFS tables.
-  // The keys represent partitions to create, coded as k1=v1/k2=v2/k3=v3..., with the
-  // root in an unpartitioned table being the empty string.
-  1: required map<string, i64> rows_appended
+  // Number of modified rows per partition. Only applies to HDFS and Kudu tables.
+  // The keys represent partitions to create, coded as k1=v1/k2=v2/k3=v3..., with
+  // the root in an unpartitioned table being the empty string.
+  1: required map<string, i64> rows_modified
 }
 
 // Response from a call to PingImpalaService

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/99ed6dc6/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index ae338d5..f03b073 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -292,6 +292,12 @@ error_codes = (
 
   ("NO_REGISTERED_BACKENDS", 94, "Cannot schedule query: no registered backends "
    "available."),
+
+  ("KUDU_KEY_ALREADY_PRESENT", 95, "Key already present in Kudu table '$0'."),
+
+  ("KUDU_KEY_NOT_FOUND", 96, "Key not found in Kudu table '$0'."),
+
+  ("KUDU_SESSION_ERROR", 97, "Error in Kudu table '$0': $1")
 )
 
 import sys

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/99ed6dc6/shell/impala_client.py
----------------------------------------------------------------------
diff --git a/shell/impala_client.py b/shell/impala_client.py
index 0d1c835..f57a015 100755
--- a/shell/impala_client.py
+++ b/shell/impala_client.py
@@ -354,7 +354,7 @@ class ImpalaClient(object):
     if status != RpcStatus.OK:
       raise RPCException()
 
-    num_rows = sum([int(k) for k in insert_result.rows_appended.values()])
+    num_rows = sum([int(k) for k in insert_result.rows_modified.values()])
     return num_rows
 
   def close_query(self, last_query_handle, query_handle_closed=False):

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/99ed6dc6/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test
index a06d203..e4f3205 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test
@@ -29,10 +29,15 @@ insert into tdata values
 (3, "todd", cast(1.0 as float), 993393939, cast('c' as VARCHAR(20)), true)
 ---- RESULTS
 : 3
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 3.*
 ====
 ---- QUERY
 update tdata set vali=43 where id = 1
 ---- RESULTS
+# TODO: Verify row count after fixing IMPALA-3713 (Here and UPDATE/DELETE below)
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 1.*
 ====
 ---- QUERY
 select * from tdata
@@ -48,6 +53,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
 # Try updating a varchar col. with a value that is bigger than it's size (truncated).
 update tdata set valv=cast('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' as VARCHAR(20)) where id = 1
 ---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 1.*
 ====
 ---- QUERY
 select * from tdata
@@ -58,10 +65,11 @@ select * from tdata
 ---- TYPES
 INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
 ====
-====
 ---- QUERY
 update tdata set valb=false where id = 1
 ---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 1.*
 ====
 ---- QUERY
 select * from tdata
@@ -75,6 +83,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
 ---- QUERY
 update tdata set vali=43 where id > 1
 ---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 2.*
 ====
 ---- QUERY
 select * from tdata
@@ -88,6 +98,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
 ---- QUERY
 update tdata set name='unknown' where name = 'martin'
 ---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 1.*
 ====
 ---- QUERY
 select * from tdata
@@ -104,6 +116,8 @@ insert into tdata values
 (120, "she", cast(0.0 as float), 99, cast('f' as VARCHAR(20)), true)
 ---- RESULTS
 : 2
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 2.*
 ====
 ---- QUERY
 select * from tdata
@@ -119,6 +133,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
 ---- QUERY
 update tdata set name=null where id = 40
 ---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 1.*
 ====
 ---- QUERY
 select * from tdata
@@ -133,6 +149,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
 ====
 ---- QUERY
 update tdata set name='he' where id = 40
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 1.*
 ---- RESULTS
 ====
 ---- QUERY
@@ -152,6 +170,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
 insert into tdata values (320, '', 2.0, 932, cast('' as VARCHAR(20)), false)
 ---- RESULTS
 : 1
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 1.*
 ====
 ---- QUERY
 select id, name, valv, valb from tdata where id = 320;
@@ -169,6 +189,10 @@ create table ignore_column_case (Id int, NAME string, vAlf float, vali bigint,
 ====
 ---- QUERY
 insert into ignore_column_case values (1, 'Martin', 1.0, 10);
+---- RESULTS
+: 1
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 1.*
 ====
 ---- QUERY
 select ID, nAmE, VALF, VALI from ignore_column_case where NaMe = 'Martin';
@@ -182,36 +206,44 @@ insert into tdata values
 (666, "The Devil", cast(1.2 as float), 43, cast('z' as VARCHAR(20)), true)
 ---- RESULTS
 : 1
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 1.*
 ====
 ---- QUERY
 insert into tdata values
 (666, "The Devil", cast(1.2 as float), 43, cast('z' as VARCHAR(20)), true)
 ---- CATCH
-Error while flushing Kudu session:
+Kudu error(s) reported, first error: Already present
 ====
 ---- QUERY
 insert ignore into tdata values
 (666, "The Devil", cast(1.2 as float), 43, cast('z' as VARCHAR(20)), true)
 ---- RESULTS
 : 0
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 0.*
 ====
 ---- QUERY
--- Updating the same record twice
+-- Updating the same record many times: cross join produces 7 identical updates
 update a set a.name='Satan' from tdata a, tdata b where a.id = 666
 ---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 7.*
 ====
 ---- QUERY
--- Does not exercise any error path in the sink because updating the same record twice
--- is valid. Makes sure IGNORE works.
+-- Does not exercise any error path in the sink because updating the same record multiple
+-- times is valid. Makes sure IGNORE works.
 update ignore a set a.name='Satan' from tdata a, tdata b where a.id = 666
 ---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 7.*
 ====
 ---- QUERY
 -- Using a cross join to generate the same delete twice. After the first delete succeeded,
 -- trying to execute the second delete will fail because the record does not exist.
 delete a from tdata a, tdata b where a.id = 666
 ---- CATCH
-Error while flushing Kudu session:
+Kudu error(s) reported, first error: Not found: key not found
 ====
 ---- QUERY
 -- Re-insert the data
@@ -223,6 +255,8 @@ insert into tdata values
 ---- QUERY
 delete ignore a from tdata a, tdata b where a.id = 666
 ---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 1.*
 ====
 ---- QUERY
 # IMPALA-3454: A delete that requires a rewrite may not get the Kudu column order correct
@@ -242,6 +276,8 @@ insert into impala_3454 values
 ---- QUERY
 delete from impala_3454 where key_1 < (select max(key_2) from impala_3454)
 ---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 2.*
 ====
 ---- QUERY
 select * from impala_3454
@@ -250,3 +286,49 @@ select * from impala_3454
 ---- TYPES
 TINYINT,BIGINT
 ====
+---- QUERY
+CREATE TABLE kudu_test_tbl PRIMARY KEY(id)
+DISTRIBUTE BY RANGE(id) SPLIT ROWS ((100000000))
+STORED AS KUDU AS
+SELECT * FROM functional_kudu.alltypes WHERE id < 100;
+---- RESULTS
+'Inserted 100 row(s)'
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 100.*
+====
+---- QUERY
+INSERT IGNORE INTO kudu_test_tbl
+SELECT * FROM functional_kudu.alltypes WHERE id < 100;
+---- RESULTS
+: 0
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 0.*
+====
+---- QUERY
+INSERT INTO kudu_test_tbl
+SELECT * FROM functional_kudu.alltypes WHERE id < 100;
+---- CATCH
+Kudu error(s) reported, first error: Already present: key already present
+====
+---- QUERY
+INSERT IGNORE INTO kudu_test_tbl
+SELECT * FROM functional_kudu.alltypes;
+---- RESULTS
+: 7200
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 7200.*
+====
+---- QUERY
+# Test a larger UPDATE
+UPDATE kudu_test_tbl SET int_col = -1;
+---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 7300.*
+====
+---- QUERY
+# Test a larger DELETE
+DELETE FROM kudu_test_tbl WHERE id > -1;
+---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 7300.*
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/99ed6dc6/tests/beeswax/impala_beeswax.py
----------------------------------------------------------------------
diff --git a/tests/beeswax/impala_beeswax.py b/tests/beeswax/impala_beeswax.py
index e0f5d55..7ed411e 100644
--- a/tests/beeswax/impala_beeswax.py
+++ b/tests/beeswax/impala_beeswax.py
@@ -416,8 +416,8 @@ class ImpalaBeeswaxClient(object):
     """Executes an insert query"""
     result = self.__do_rpc(lambda: self.imp_service.CloseInsert(handle))
     # The insert was successful
-    num_rows = sum(map(int, result.rows_appended.values()))
-    data = ["%s: %s" % row for row in result.rows_appended.iteritems()]
+    num_rows = sum(map(int, result.rows_modified.values()))
+    data = ["%s: %s" % row for row in result.rows_modified.iteritems()]
     exec_result = ImpalaBeeswaxResult(success=True, data=data)
     exec_result.summary = "Inserted %d rows" % (num_rows,)
     return exec_result