You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ph...@apache.org on 2018/03/31 23:35:02 UTC

[01/12] impala git commit: IMPALA-6694: fix "buffer pool" child profile order

Repository: impala
Updated Branches:
  refs/heads/2.x fc0af7f2a -> 0197b17fa


IMPALA-6694: fix "buffer pool" child profile order

The bug is that child profiles can be re-ordered when being sent between
an executor and a coordinator. This occurs if child profile A is present
in one update, then another child profile B is inserted at a position
before A and is sent to the coordinator in a subsequent update. The
algorithm for merging profiles did not preserve the order in that case.

The algorithm is fixed to preserve order when the relative order of
child profiles is consistent between all updates.

Testing:
Added a targeted unit test.

Change-Id: I230f0673edf20a846fdb13191b7a292d329c1bb8
Reviewed-on: http://gerrit.cloudera.org:8080/9749
Reviewed-by: Lars Volker <lv...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/792dcbad
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/792dcbad
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/792dcbad

Branch: refs/heads/2.x
Commit: 792dcbad83811d160627a636f2f83e17a559c68f
Parents: 3855cb0
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Mar 21 17:01:45 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Mar 28 23:59:02 2018 +0000

----------------------------------------------------------------------
 be/src/util/runtime-profile-test.cc | 72 ++++++++++++++++++++++++++++++++
 be/src/util/runtime-profile.cc      | 38 ++++++++++++++---
 2 files changed, 104 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/792dcbad/be/src/util/runtime-profile-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile-test.cc b/be/src/util/runtime-profile-test.cc
index d3394b2..0955203 100644
--- a/be/src/util/runtime-profile-test.cc
+++ b/be/src/util/runtime-profile-test.cc
@@ -16,7 +16,9 @@
 // under the License.
 
 #include <stdlib.h>
+#include <algorithm>
 #include <iostream>
+
 #include <boost/bind.hpp>
 
 #include "common/object-pool.h"
@@ -222,6 +224,76 @@ TEST(CountersTest, MergeAndUpdate) {
   profile2->PrettyPrint(&dummy);
 }
 
+// Regression test for IMPALA-6694 - child order isn't preserved if a child
+// is prepended between updates.
+TEST(CountersTest, MergeAndUpdateChildOrder) {
+  ObjectPool pool;
+  // Add Child2 first.
+  RuntimeProfile* profile1 = RuntimeProfile::Create(&pool, "Parent");
+  RuntimeProfile* p1_child2 = RuntimeProfile::Create(&pool, "Child2");
+  profile1->AddChild(p1_child2);
+  TRuntimeProfileTree tprofile1_v1, tprofile1_v2, tprofile1_v3;
+  profile1->ToThrift(&tprofile1_v1);
+
+  // Update averaged and deserialized profiles from the serialized profile.
+  RuntimeProfile* averaged_profile = RuntimeProfile::Create(&pool, "merged", true);
+  RuntimeProfile* deserialized_profile = RuntimeProfile::Create(&pool, "Parent");
+  averaged_profile->UpdateAverage(profile1);
+  deserialized_profile->Update(tprofile1_v1);
+
+  std::vector<RuntimeProfile*> tmp_children;
+  averaged_profile->GetChildren(&tmp_children);
+  EXPECT_EQ(1, tmp_children.size());
+  EXPECT_EQ("Child2", tmp_children[0]->name());
+  deserialized_profile->GetChildren(&tmp_children);
+  EXPECT_EQ(1, tmp_children.size());
+  EXPECT_EQ("Child2", tmp_children[0]->name());
+
+  // Prepend Child1 and update profiles.
+  RuntimeProfile* p1_child1 = RuntimeProfile::Create(&pool, "Child1");
+  profile1->PrependChild(p1_child1);
+  profile1->ToThrift(&tprofile1_v2);
+  averaged_profile->UpdateAverage(profile1);
+  deserialized_profile->Update(tprofile1_v2);
+
+  averaged_profile->GetChildren(&tmp_children);
+  EXPECT_EQ(2, tmp_children.size());
+  EXPECT_EQ("Child1", tmp_children[0]->name());
+  EXPECT_EQ("Child2", tmp_children[1]->name());
+  deserialized_profile->GetChildren(&tmp_children);
+  EXPECT_EQ(2, tmp_children.size());
+  EXPECT_EQ("Child1", tmp_children[0]->name());
+  EXPECT_EQ("Child2", tmp_children[1]->name());
+
+  // Test that changes in order of children is handled gracefully by preserving the
+  // order from the previous update.
+  profile1->SortChildren([](
+        const pair<RuntimeProfile*, bool>& p1, const pair<RuntimeProfile*, bool>& p2) {
+    return p1.first->name() > p2.first->name();
+  });
+  profile1->GetChildren(&tmp_children);
+  EXPECT_EQ("Child2", tmp_children[0]->name());
+  EXPECT_EQ("Child1", tmp_children[1]->name());
+  profile1->ToThrift(&tprofile1_v3);
+  averaged_profile->UpdateAverage(profile1);
+  deserialized_profile->Update(tprofile1_v2);
+
+  // The previous order of children that were already present is preserved.
+  averaged_profile->GetChildren(&tmp_children);
+  EXPECT_EQ(2, tmp_children.size());
+  EXPECT_EQ("Child1", tmp_children[0]->name());
+  EXPECT_EQ("Child2", tmp_children[1]->name());
+  deserialized_profile->GetChildren(&tmp_children);
+  EXPECT_EQ(2, tmp_children.size());
+  EXPECT_EQ("Child1", tmp_children[0]->name());
+  EXPECT_EQ("Child2", tmp_children[1]->name());
+
+  // Make sure we can print the profiles.
+  stringstream dummy;
+  averaged_profile->PrettyPrint(&dummy);
+  deserialized_profile->PrettyPrint(&dummy);
+}
+
 TEST(CountersTest, HighWaterMarkCounters) {
   ObjectPool pool;
   RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");

http://git-wip-us.apache.org/repos/asf/impala/blob/792dcbad/be/src/util/runtime-profile.cc
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index c057cac..5f205a7 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -202,19 +202,33 @@ void RuntimeProfile::UpdateAverage(RuntimeProfile* other) {
   {
     lock_guard<SpinLock> l(children_lock_);
     lock_guard<SpinLock> m(other->children_lock_);
-    // Recursively merge children with matching names
+    // Recursively merge children with matching names.
+    // Track the current position in the vector so we preserve the order of children
+    // if children are added after the first Update()/UpdateAverage() call (IMPALA-6694).
+    // E.g. if the first update sends [B, D] and the second update sends [A, B, C, D],
+    // then this code makes sure that children_ is [A, B, C, D] afterwards.
+    ChildVector::iterator insert_pos = children_.begin();
     for (int i = 0; i < other->children_.size(); ++i) {
       RuntimeProfile* other_child = other->children_[i].first;
       ChildMap::iterator j = child_map_.find(other_child->name_);
       RuntimeProfile* child = NULL;
       if (j != child_map_.end()) {
         child = j->second;
+        // Search forward until the insert position is either at the end of the vector
+        // or after this child. This preserves the order if the relative order of
+        // children in all updates is consistent.
+        bool found_child = false;
+        while (insert_pos != children_.end() && !found_child) {
+          found_child = insert_pos->first == child;
+          ++insert_pos;
+        }
       } else {
         child = Create(pool_, other_child->name_, true);
         child->metadata_ = other_child->metadata_;
         bool indent_other_child = other->children_[i].second;
         child_map_[child->name_] = child;
-        children_.push_back(make_pair(child, indent_other_child));
+        insert_pos = children_.insert(insert_pos, make_pair(child, indent_other_child));
+        ++insert_pos;
       }
       child->UpdateAverage(other_child);
     }
@@ -329,6 +343,11 @@ void RuntimeProfile::Update(const vector<TRuntimeProfileNode>& nodes, int* idx)
   ++*idx;
   {
     lock_guard<SpinLock> l(children_lock_);
+    // Track the current position in the vector so we preserve the order of children
+    // if children are added after the first Update()/UpdateAverage() call (IMPALA-6694).
+    // E.g. if the first update sends [B, D] and the second update sends [A, B, C, D],
+    // then this code makes sure that children_ is [A, B, C, D] afterwards.
+    ChildVector::iterator insert_pos = children_.begin();
     // Update children with matching names; create new ones if they don't match.
     for (int i = 0; i < node.num_children; ++i) {
       const TRuntimeProfileNode& tchild = nodes[*idx];
@@ -336,11 +355,20 @@ void RuntimeProfile::Update(const vector<TRuntimeProfileNode>& nodes, int* idx)
       RuntimeProfile* child = NULL;
       if (j != child_map_.end()) {
         child = j->second;
+        // Search forward until the insert position is either at the end of the vector
+        // or after this child. This preserves the order if the relative order of
+        // children in all updates is consistent.
+        bool found_child = false;
+        while (insert_pos != children_.end() && !found_child) {
+          found_child = insert_pos->first == child;
+          ++insert_pos;
+        }
       } else {
         child = Create(pool_, tchild.name);
         child->metadata_ = tchild.metadata;
         child_map_[tchild.name] = child;
-        children_.push_back(make_pair(child, tchild.indent));
+        insert_pos = children_.insert(insert_pos, make_pair(child, tchild.indent));
+        ++insert_pos;
       }
       child->Update(nodes, idx);
     }
@@ -462,9 +490,7 @@ RuntimeProfile* RuntimeProfile::CreateChild(const string& name, bool indent,
 void RuntimeProfile::GetChildren(vector<RuntimeProfile*>* children) {
   children->clear();
   lock_guard<SpinLock> l(children_lock_);
-  for (ChildMap::iterator i = child_map_.begin(); i != child_map_.end(); ++i) {
-    children->push_back(i->second);
-  }
+  for (const auto& entry : children_) children->push_back(entry.first);
 }
 
 void RuntimeProfile::GetAllChildren(vector<RuntimeProfile*>* children) {


[07/12] impala git commit: IMPALA-5384, part 1: introduce DmlExecState

Posted by ph...@apache.org.
http://git-wip-us.apache.org/repos/asf/impala/blob/e882cbb9/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index ea88d73..9b1c2f5 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -49,6 +49,7 @@
 #include "rpc/thrift-thread.h"
 #include "rpc/thrift-util.h"
 #include "runtime/client-cache.h"
+#include "runtime/coordinator.h"
 #include "runtime/data-stream-mgr.h"
 #include "runtime/exec-env.h"
 #include "runtime/lib-cache.h"
@@ -59,6 +60,7 @@
 #include "service/impala-http-handler.h"
 #include "service/impala-internal-service.h"
 #include "service/client-request-state.h"
+#include "service/frontend.h"
 #include "util/bit-util.h"
 #include "util/container-util.h"
 #include "util/debug-util.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/e882cbb9/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 23d4687..2af89fd 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -34,7 +34,6 @@
 #include "gen-cpp/Frontend_types.h"
 #include "rpc/thrift-server.h"
 #include "common/status.h"
-#include "service/frontend.h"
 #include "service/query-options.h"
 #include "util/condition-variable.h"
 #include "util/metrics.h"
@@ -43,8 +42,6 @@
 #include "util/simple-logger.h"
 #include "util/thread-pool.h"
 #include "util/time.h"
-#include "runtime/coordinator.h"
-#include "runtime/runtime-state.h"
 #include "runtime/timestamp-value.h"
 #include "runtime/types.h"
 #include "statestore/statestore-subscriber.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/e882cbb9/be/src/testutil/in-process-servers.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/in-process-servers.cc b/be/src/testutil/in-process-servers.cc
index 7ff44a8..19afcd5 100644
--- a/be/src/testutil/in-process-servers.cc
+++ b/be/src/testutil/in-process-servers.cc
@@ -29,6 +29,7 @@
 #include "util/metrics.h"
 #include "util/openssl-util.h"
 #include "runtime/exec-env.h"
+#include "service/frontend.h"
 #include "service/impala-server.h"
 
 #include "common/names.h"


[10/12] impala git commit: IMPALA-6731: Move execnet Python dependency to stage 2

Posted by ph...@apache.org.
IMPALA-6731: Move execnet Python dependency to stage 2

It seems that execnet also cannot be installed together with
setuptools-scm if only a local mirror and index are available
(similar to https://github.com/pywebhdfs/pywebhdfs/issues/52).

Testing: Observed that execnet failed to install during
bootstrap_toolchain.py on a CentOS 6.4 EC2 instanc at 5:02pm (within the
brownout period). With this change, bootstrap_toolchain.py succeeded.

Change-Id: Ic949edcc03f0e068bdd84b6ede487e64dcf2439b
Reviewed-on: http://gerrit.cloudera.org:8080/9850
Reviewed-by: David Knupp <dk...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/f69fbcd1
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/f69fbcd1
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/f69fbcd1

Branch: refs/heads/2.x
Commit: f69fbcd1d7b5f4296fde38ed05153b257bdf0a6f
Parents: 382d779
Author: Lars Volker <lv...@cloudera.com>
Authored: Wed Mar 28 16:58:18 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Mar 29 07:17:25 2018 +0000

----------------------------------------------------------------------
 infra/python/deps/requirements.txt        | 3 ---
 infra/python/deps/stage2-requirements.txt | 3 +++
 2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/f69fbcd1/infra/python/deps/requirements.txt
----------------------------------------------------------------------
diff --git a/infra/python/deps/requirements.txt b/infra/python/deps/requirements.txt
index bea16f4..d55bc19 100644
--- a/infra/python/deps/requirements.txt
+++ b/infra/python/deps/requirements.txt
@@ -39,9 +39,6 @@ Flask == 0.10.1
   MarkupSafe == 0.23
   Werkzeug == 0.11.3
   itsdangerous == 0.24
-hdfs == 2.0.2
-  docopt == 0.6.2
-  execnet == 1.4.0
 kazoo == 2.2.1
 ordereddict == 1.1
 pexpect == 3.3

http://git-wip-us.apache.org/repos/asf/impala/blob/f69fbcd1/infra/python/deps/stage2-requirements.txt
----------------------------------------------------------------------
diff --git a/infra/python/deps/stage2-requirements.txt b/infra/python/deps/stage2-requirements.txt
index eda2cd3..c7c947c 100644
--- a/infra/python/deps/stage2-requirements.txt
+++ b/infra/python/deps/stage2-requirements.txt
@@ -27,6 +27,9 @@ pytest == 2.9.2
   pytest-random == 0.02
   pytest-runner == 4.2
   pytest-xdist == 1.17.1
+hdfs == 2.0.2
+  docopt == 0.6.2
+  execnet == 1.4.0
 
 # Requires pbr
 pywebhdfs == 0.3.2


[12/12] impala git commit: Revert "IMPALA-6747: Automate diagnostics collection."

Posted by ph...@apache.org.
Revert "IMPALA-6747: Automate diagnostics collection."

A couple of things donot work in python2.6
 -- Multiple with statements in the same context
 -- shutil.make_archive()

I need a little more time to test the fix with python2.6.
Meanwhile, reverting this to unblock others. I'll resubmit
the fix when I'm confident that it works with python2.6

This reverts commit 2883c9950026db74240a69ab07e867810b8547b0.

Change-Id: I221ede9d5eb4d89ea20992cc27a8284803af3223
Reviewed-on: http://gerrit.cloudera.org:8080/9872
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Michael Ho <kw...@cloudera.com>
(cherry picked from commit cf4f314922f13fbf54c6d7300ceaa2229bf5916a)
Reviewed-on: http://gerrit.cloudera.org:8080/9875
Reviewed-by: Philip Zeyliger <ph...@cloudera.com>
Tested-by: Tianyi Wang <tw...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/0197b17f
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/0197b17f
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/0197b17f

Branch: refs/heads/2.x
Commit: 0197b17fac88aab14f714fcc838a8d9a256a133f
Parents: 25b2344
Author: Bharath Vissapragada <bh...@cloudera.com>
Authored: Fri Mar 30 12:43:07 2018 -0700
Committer: Tianyi Wang <tw...@cloudera.com>
Committed: Fri Mar 30 22:21:19 2018 +0000

----------------------------------------------------------------------
 bin/diagnostics/__init__.py            |   0
 bin/diagnostics/collect_diagnostics.py | 518 ----------------------------
 bin/diagnostics/collect_shared_libs.sh |  52 ---
 bin/rat_exclude_files.txt              |   1 -
 tests/unittests/test_command.py        |  49 ---
 5 files changed, 620 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/0197b17f/bin/diagnostics/__init__.py
----------------------------------------------------------------------
diff --git a/bin/diagnostics/__init__.py b/bin/diagnostics/__init__.py
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/impala/blob/0197b17f/bin/diagnostics/collect_diagnostics.py
----------------------------------------------------------------------
diff --git a/bin/diagnostics/collect_diagnostics.py b/bin/diagnostics/collect_diagnostics.py
deleted file mode 100644
index 6abc30a..0000000
--- a/bin/diagnostics/collect_diagnostics.py
+++ /dev/null
@@ -1,518 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import argparse
-import datetime
-import errno
-import getpass
-import glob
-import logging
-import math
-import os
-import shutil
-import subprocess
-import sys
-import time
-import tempfile
-import traceback
-
-from collections import namedtuple
-from struct import Struct
-from threading import Timer
-
-# This script is for automating the collection of following diagnostics from a host
-# running an Impala service daemon (catalogd/statestored/impalad). Following diagnostics
-# are supported.
-#
-# 1. Native core dump (+ shared libs)
-# 2. GDB/Java thread dump (pstack + jstack)
-# 3. Java heap dump (jmap)
-# 4. Minidumps (using breakpad)
-# 5. Profiles
-#
-# Dependencies:
-# 1. gdb package should be installed to collect native thread stacks/coredump. The binary
-#    location is picked up from the system path. In case of pstacks, the script falls back
-#    to the breakpad minidumps if the 'pstack' binary is not in system path.
-# 2. jstack/jmap from a JRE/JDK. Default location is picked up from system path but can be
-#    overriden with --java_home PATH_TO_JAVA_HOME.
-# 3. Mindumps are collected by sending a SIGUSR1 signal to the Impala process. Impala
-#    versions without full breakpad support (<= release 2.6) will reliably crash if
-#    we attempt to do that since those versions do not have the corresponding signal
-#    handler. Hence it is suggested to run this script only on releases 2.7 and later.
-#
-# Usage: python collect_diagnostics.py --help
-#
-# Few example usages:
-#
-# Collect 3 jstacks, pstacks from an impalad process 3s apart.
-#  python collect_diagnostics.py --pid $(pidof impalad) --stacks 3 3
-#
-# Collect core dump and a Java heapdump from the catalogd process
-#  python collect_diagnostics.py --pid $(pidof impalad) --jmap --gcore
-#
-# Collect 5 breakpad minidumps from a statestored process 5s apart.
-#  python collect_diagnostics.py --pid $(pidof statestored) --minidumps 5 5
-#      --minidumps_dir /var/log/statestored/minidumps
-#
-#
-class Command(object):
-  """Wrapper around subprocess.Popen() that is canceled after a configurable timeout."""
-  def __init__(self, cmd, timeout=30):
-    self.cmd = cmd
-    self.timeout = timeout
-    self.child_killed_by_timeout = False
-
-  def run(self, cmd_stdin=None, cmd_stdout=subprocess.PIPE):
-    """Runs the command 'cmd' by setting the appropriate stdin/out. The command is killed
-    if hits a timeout (controlled by self.timeout)."""
-    cmd_string = " ".join(self.cmd)
-    logging.info("Starting command %s with a timeout of %s"
-        % (cmd_string, str(self.timeout)))
-    self.child = subprocess.Popen(self.cmd, stdin=cmd_stdin, stdout=cmd_stdout)
-    timer = Timer(self.timeout, self.kill_child)
-    try:
-      timer.start()
-      # self.stdout is set to None if cmd_stdout is anything other than PIPE. The actual
-      # stdout is written to the file corresponding to cmd_stdout.
-      self.stdout = self.child.communicate()[0]
-      if self.child.returncode == 0:
-        logging.info("Command finished successfully: " + cmd_string)
-      else:
-        cmd_status = "timed out" if self.child_killed_by_timeout else "failed"
-        logging.error("Command %s: %s" % (cmd_status, cmd_string))
-      return self.child.returncode
-    finally:
-      timer.cancel()
-    return -1
-
-  def kill_child(self):
-    """Kills the running command (self.child)."""
-    self.child_killed_by_timeout = True
-    self.child.kill()
-
-class ImpalaDiagnosticsHandler(object):
-  IMPALA_PROCESSES = ["impalad", "catalogd", "statestored"]
-  OUTPUT_DIRS_TO_CREATE = ["stacks", "gcores", "jmaps", "profiles",
-      "shared_libs", "minidumps"]
-  MINIDUMP_HEADER = namedtuple("MDRawHeader", "signature version stream_count \
-      stream_directory_rva checksum time_date_stamp flags")
-
-  def __init__(self, args):
-    """Initializes the state by setting the paths of required executables."""
-    self.args = args
-    if args.pid <= 0:
-      return
-
-    self.script_dir = os.path.dirname(os.path.realpath(sys.argv[0]))
-    # Name of the Impala process for which diagnostics should be collected.
-    self.target_process_name = self.get_target_process_name()
-
-    self.java_home = self.get_java_home_from_env()
-    if not self.java_home and args.java_home:
-      self.java_home = os.path.abspath(args.java_home)
-    self.jstack_cmd = os.path.join(self.java_home, "bin/jstack")
-    self.java_cmd = os.path.join(self.java_home, "bin/java")
-    self.jmap_cmd = os.path.join(self.java_home, "bin/jmap")
-
-    self.gdb_cmd = self.get_command_from_path("gdb")
-    self.gcore_cmd = self.get_command_from_path("gcore")
-    self.pstack_cmd = self.get_command_from_path("pstack")
-
-  def create_output_dir_structure(self):
-    """Creates the skeleton directory structure for the diagnostics output collection."""
-    self.collection_root_dir = tempfile.mkdtemp(prefix="impala-diagnostics-%s" %
-        datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S-"),
-        dir=os.path.abspath(self.args.output_dir))
-    for dirname in self.OUTPUT_DIRS_TO_CREATE:
-      os.mkdir(os.path.join(self.collection_root_dir, dirname))
-
-  def get_command_from_path(self, cmd):
-    """Returns the path to a given command executable, if one exists in the
-    system PATH."""
-    for path in os.environ["PATH"].split(os.pathsep):
-      cmd_path = os.path.join(path, cmd)
-      if os.access(cmd_path, os.X_OK):
-        return cmd_path
-    return ""
-
-  def get_target_process_name(self):
-    """Returns the process name of the target process for which diagnostics
-    should be collected."""
-    try:
-      return open("/proc/%s/comm" % self.args.pid).read().strip()
-    except Exception:
-      logging.exception("Failed to get target process name.")
-      return ""
-
-  def get_num_child_proc(self, name):
-    """Returns number of processes with the given name and target Impala pid
-    as parent."""
-    cmd = Command(["pgrep", "-c", "-P", str(self.args.pid), name])
-    cmd.run()
-    return int(cmd.stdout.strip())
-
-  def get_java_home_from_env(self):
-    """Returns JAVA_HOME set in the env of the target process."""
-    try:
-      envs = open("/proc/%s/environ" % self.args.pid).read().split("\0")
-      for s in envs:
-        k, v = s.split("=", 1)
-        if k == "JAVA_HOME":
-          return v
-    except Exception:
-      logging.exception("Failed to determine JAVA_HOME from proc env.")
-      return ""
-
-  def get_free_disk_space_gbs(self, path):
-    """Returns free disk space (in GBs) of the partition hosting the given path."""
-    s = os.statvfs(path)
-    return (s.f_bsize * s.f_bavail)/(1024.0 * 1024.0 * 1024.0)
-
-  def get_minidump_create_timestamp(self, minidump_path):
-    """Returns the unix timestamp of the minidump create time. It is extracted from
-    the minidump header."""
-    # Read the minidump's header to extract the create time stamp. More information about
-    # the mindump header format can be found here: https://goo.gl/uxKZVe
-    #
-    # typedef struct {
-    #   uint32_t  signature;
-    #   uint32_t  version;
-    #   uint32_t  stream_count;
-    #   MDRVA     stream_directory_rva;  /* A |stream_count|-sized array of
-    #                                     * MDRawDirectory structures. */
-    #   uint32_t  checksum;              /* Can be 0.  In fact, that's all that's
-    #                                     * been found in minidump files. */
-    #   uint32_t  time_date_stamp;       /* time_t */
-    #   uint64_t  flags;
-    # } MDRawHeader;  /* MINIDUMP_HEADER */
-    s = Struct("IIIiIIQ")
-    data = open(minidump_path, "rb").read(s.size)
-    header = self.MINIDUMP_HEADER(*s.unpack_from(data))
-    return header.time_date_stamp
-
-  def wait_for_minidump(self):
-    """Minidump collection is async after sending the SIGUSR1 signal. So this method
-    waits till it is written to the disk. Since minidump forks off a new process from
-    the parent Impala process we need to wait till the forked process exits.
-    Returns after 30s to prevent infinite waiting. Should be called after sending the
-    SIGUSR1 signal to the Impala process."""
-    MAX_WAIT_TIME_S = 30
-    start_time = time.time()
-    while time.time() < start_time + MAX_WAIT_TIME_S:
-      # Sleep for a bit to ensure that the process fork to write minidump has started.
-      # Otherwise the subsequent check on the process count could pass even when the
-      # fork didn't succeed. This sleep reduces the likelihood of such race.
-      time.sleep(1)
-      if self.get_num_child_proc(self.target_process_name) == 1:
-        break
-    return
-
-  def validate_args(self):
-    """Returns True if self.args are valid, false otherwise"""
-    if self.args.pid <= 0:
-      logging.critical("Invalid PID provided.")
-      return False
-
-    if self.target_process_name not in self.IMPALA_PROCESSES:
-      logging.critical("No valid Impala process with the given PID %s" % str(self.args.pid))
-      return False
-
-    if not self.java_home:
-      logging.critical("JAVA_HOME could not be inferred from process env.\
-          Please specify --java_home.")
-      return False
-
-    if self.args.jmap and not os.path.exists(self.jmap_cmd):
-      logging.critical("jmap binary not found, required to collect a Java heap dump.")
-      return False
-
-    if self.args.gcore and not os.path.exists(self.gcore_cmd):
-      logging.critical("gcore binary not found, required to collect a core dump.")
-      return False
-
-    if self.args.profiles_dir and not os.path.isdir(self.args.profiles_dir):
-      logging.critical("No valid profiles directory at path: %s" % self.args.profiles_dir)
-      return False
-
-    return True
-
-  def collect_thread_stacks(self):
-    """Collects jstack/jstack-m/pstack for the given pid in that order. pstack collection
-    falls back to minidumps if pstack binary is missing from the system path. Minidumps
-    are collected by sending a SIGUSR1 to the Impala process and then archiving the
-    contents of the minidump directory. The number of times stacks are collected and the
-    sleep time between the collections are controlled by --stacks argument."""
-    stacks_count, stacks_interval_secs = self.args.stacks
-    if stacks_count <= 0 or stacks_interval_secs < 0:
-      return
-
-    # Skip jstack collection if the jstack binary does not exist.
-    skip_jstacks = not os.path.exists(self.jstack_cmd)
-    if skip_jstacks:
-      logging.info("Skipping jstack collection since jstack binary couldn't be located.")
-
-    # Fallback to breakpad minidump collection if pstack binaries are missing.
-    fallback_to_minidump = False
-    if not self.pstack_cmd:
-      # Fall back to collecting a minidump if pstack is not installed.
-      if not os.path.exists(self.args.minidumps_dir):
-        logging.info("Skipping pstacks since pstack binary couldn't be located. Provide "
-            + "--minidumps_dir for collecting minidumps instead.")
-        # At this point, we can't proceed since we have nothing to collect.
-        if skip_jstacks:
-          return
-      else:
-        fallback_to_minidump = True;
-        logging.info("Collecting breakpad minidumps since pstack/gdb binaries are " +
-            "missing.")
-
-    stacks_dir = os.path.join(self.collection_root_dir, "stacks")
-    # Populate the commands to run in 'cmds_to_run' depending on what kinds of thread
-    # stacks to collect. Each entry is a tuple of form
-    # (Command, stdout_prefix, is_minidump). 'is_minidump' tells whether the command
-    # is trying to trigger a minidump collection.
-    cmds_to_run = []
-    if not skip_jstacks:
-      cmd_args = [self.jstack_cmd, str(self.args.pid)]
-      cmds_to_run.append((Command(cmd_args, self.args.timeout), "jstack", False))
-      # Collect mixed-mode jstack, contains native stack frames.
-      cmd_args_mixed_mode = [self.jstack_cmd, "-m", str(self.args.pid)]
-      cmds_to_run.append(
-          (Command(cmd_args_mixed_mode, self.args.timeout), "jstack-m", False))
-
-    if fallback_to_minidump:
-      cmd_args = ["kill", "-SIGUSR1", str(self.args.pid)]
-      cmds_to_run.append((Command(cmd_args, self.args.timeout), None, True))
-    elif self.pstack_cmd:
-      cmd_args = [self.pstack_cmd, str(self.args.pid)]
-      cmds_to_run.append((Command(cmd_args, self.args.timeout), "pstack", False))
-
-    collection_start_ts = time.time()
-    for i in xrange(stacks_count):
-      for cmd, file_prefix, is_minidump in cmds_to_run:
-        if file_prefix:
-          stdout_file = os.path.join(stacks_dir, file_prefix + "-" + str(i) + ".txt")
-          with open(stdout_file, "w") as output:
-            cmd.run(cmd_stdout=output)
-        else:
-          cmd.run()
-          # Incase of minidump collection, wait for it to be written.
-          if is_minidump:
-            self.wait_for_minidump()
-      time.sleep(stacks_interval_secs)
-
-    # Copy minidumps if required.
-    if fallback_to_minidump:
-      minidump_out_dir =  os.path.join(self.collection_root_dir, "minidumps")
-      self.copy_minidumps(minidump_out_dir, collection_start_ts);
-
-  def collect_minidumps(self):
-    """Collects minidumps on the Impala process based on argument --minidumps. The
-    minidumps are collected by sending a SIGUSR1 signal to the Impala process and then
-    the resulting minidumps are copied to the target directory."""
-    minidump_count, minidump_interval_secs = self.args.minidumps
-    if minidump_count <= 0 or minidump_interval_secs < 0:
-      return
-    # Impala process writes a minidump when it encounters a SIGUSR1.
-    cmd_args = ["kill", "-SIGUSR1", str(self.args.pid)]
-    cmd = Command(cmd_args, self.args.timeout)
-    collection_start_ts = time.time()
-    for i in xrange(minidump_count):
-      cmd.run()
-      self.wait_for_minidump()
-      time.sleep(minidump_interval_secs)
-    out_dir = os.path.join(self.collection_root_dir, "minidumps")
-    self.copy_minidumps(out_dir, collection_start_ts);
-
-  def copy_minidumps(self, target, start_ts):
-    """Copies mindumps with create time >= start_ts to 'target' directory."""
-    logging.info("Copying minidumps from %s to %s with ctime >= %s"
-        % (self.args.minidumps_dir, target, start_ts))
-    for filename in glob.glob(os.path.join(self.args.minidumps_dir, "*.dmp")):
-      try:
-        minidump_ctime = self.get_minidump_create_timestamp(filename)
-        if minidump_ctime >= math.floor(start_ts):
-          shutil.copy2(filename, target)
-        else:
-          logging.info("Ignored mindump: %s ctime: %s" % (filename, minidump_ctime))
-      except Exception:
-        logging.exception("Error processing minidump at path: %s. Skipping it." % filename)
-
-  def collect_java_heapdump(self):
-    """Generates the Java heap dump of the Impala process using the 'jmap' command."""
-    if not self.args.jmap:
-      return
-    jmap_dir = os.path.join(self.collection_root_dir, "jmaps")
-    out_file = os.path.join(jmap_dir, self.target_process_name + "_heap.bin")
-    # jmap command requires it to be run as the process owner.
-    # Command: jmap -dump:format=b,file=<outfile> <pid>
-    cmd_args = [self.jmap_cmd, "-dump:format=b,file=" + out_file, str(self.args.pid)]
-    Command(cmd_args, self.args.timeout).run()
-
-  def collect_native_coredump(self):
-    """Generates the core dump of the Impala process using the 'gcore' command"""
-    if not self.args.gcore:
-      return
-    # Command: gcore -o <outfile> <pid>
-    gcore_dir = os.path.join(self.collection_root_dir, "gcores")
-    out_file_name = self.target_process_name + "-" +\
-        datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S") + ".core"
-    out_file = os.path.join(gcore_dir, out_file_name)
-    cmd_args = [self.gcore_cmd, "-o", out_file, str(self.args.pid)]
-    Command(cmd_args, self.args.timeout).run()
-
-  def collect_query_profiles(self):
-    """Collects Impala query profiles from --profiles_dir. Enforces an uncompressed limit
-    of --profiles_max_size_limit bytes on the copied profile logs."""
-    if not self.args.profiles_dir:
-      return
-    out_dir = os.path.join(self.collection_root_dir, "profiles")
-    # Hardcoded in Impala
-    PROFILE_LOG_FILE_PATTERN = "impala_profile_log_1.1-*";
-    logging.info("Collecting profile data, limiting size to %f GB" %
-        (self.args.profiles_max_size_limit/(1024 * 1024 * 1024)))
-
-    profiles_path = os.path.join(self.args.profiles_dir, PROFILE_LOG_FILE_PATTERN)
-    # Sort the profiles by creation time and copy the most recent ones in that order.
-    sorted_profiles =\
-        sorted(glob.iglob(profiles_path), key=os.path.getctime, reverse=True)
-    profile_size_included_so_far = 0
-    for profile_path in sorted_profiles:
-      try:
-        file_size = os.path.getsize(profile_path)
-        if profile_size_included_so_far + file_size > self.args.profiles_max_size_limit:
-          # Copying the whole file violates profiles_max_size_limit. Copy a part of it.
-          # Profile logs are newline delimited with a single profile per line.
-          num_bytes_to_copy =\
-              self.args.profiles_max_size_limit - profile_size_included_so_far
-          file_name = os.path.basename(profile_path)
-          copied_bytes = 0
-          with open(profile_path, "rb") as in_file,\
-              open(os.path.join(out_dir, file_name), "wb") as out_file:
-            for line in in_file.readlines():
-              if copied_bytes + len(line) > num_bytes_to_copy:
-                break
-              out_file.write(line)
-              copied_bytes += len(line)
-          return
-        profile_size_included_so_far += file_size
-        shutil.copy2(profile_path, out_dir)
-      except:
-        logging.exception("Encountered an error while collecting profile %s. Skipping it."
-            % profile_path)
-
-  def collect_shared_libs(self):
-    """Collects shared libraries loaded by the target Impala process."""
-    # Shared libs are collected if either of core dump or minidumps are enabled.
-    if not (self.args.gcore or self.args.minidumps_dir):
-      return
-    out_dir = os.path.join(self.collection_root_dir, "shared_libs")
-
-    script_path = os.path.join(self.script_dir, "collect_shared_libs.sh")
-    cmd_args = [script_path, self.gdb_cmd, str(self.args.pid), out_dir]
-    Command(cmd_args, self.args.timeout).run()
-
-  def cleanup(self):
-    """Cleans up the directory to which diagnostics were written."""
-    shutil.rmtree(self.collection_root_dir, ignore_errors=True)
-
-  def get_diagnostics(self):
-    """Calls all collect_*() methods to collect diagnostics. Returns True if no errors
-    were encountered during diagnostics collection, False otherwise."""
-    if not self.validate_args():
-      return False
-    logging.info("Using JAVA_HOME: %s" % self.java_home)
-    self.create_output_dir_structure()
-    logging.info("Free disk space: %.2fGB" %
-        self.get_free_disk_space_gbs(self.collection_root_dir))
-    os.chdir(self.args.output_dir)
-    collection_methods = [self.collect_shared_libs, self.collect_query_profiles,
-        self.collect_native_coredump, self.collect_java_heapdump, self.collect_minidumps,
-        self.collect_thread_stacks]
-    exception_encountered = False
-    for method in collection_methods:
-      try:
-        method()
-      except IOError as e:
-        if e.errno == errno.ENOSPC:
-          # Clean up and abort if we are low on disk space. Other IOErrors are logged and
-          # ignored.
-          logging.exception("Disk space low, aborting.")
-          self.cleanup()
-          return False
-        logging.exception("Encountered an IOError calling: %s" % method.__name__)
-        exception_encountered = True
-      except Exception:
-        exception_encountered = True
-        logging.exception("Encountered an exception calling: %s" % method.__name__)
-    if exception_encountered:
-      logging.error("Encountered an exception collecting diagnostics. Final output " +
-          "could be partial.\n")
-    # Archive the directory, even if it is partial.
-    archive_path = self.collection_root_dir + ".tar.gz"
-    logging.info("Archiving diagnostics to path: %s" % archive_path)
-    shutil.make_archive(self.collection_root_dir, "gztar", self.collection_root_dir)
-    self.cleanup()
-    logging.info("Diagnostics collected at path: %s" % archive_path)
-    return not exception_encountered
-
-def get_args_parser():
-  """Creates the argument parser and adds the flags"""
-  parser = argparse.ArgumentParser(description="Impala diagnostics collection")
-  parser.add_argument("--pid", action="store", dest="pid", type=int, default=0,
-      help="PID of the Impala process for which diagnostics should be collected.")
-  parser.add_argument("--java_home", action="store", dest="java_home", default="",
-      help="If not set, it is set to the JAVA_HOME from the pid's environment.")
-  parser.add_argument("--timeout", action="store", dest="timeout", default=300,
-      type=int, help="Timeout (in seconds) for each of the diagnostics commands")
-  parser.add_argument("--stacks", action="store", dest="stacks", nargs=2, type=int,
-      default=[0, 0], metavar=("COUNT", "INTERVAL (in seconds)"),
-      help="Collect jstack, mixed-mode jstack and pstacks of the Impala process.\
-      Breakpad minidumps are collected in case of missing pstack binaries.")
-  parser.add_argument("--jmap", action="store_true", dest="jmap", default=False,
-      help="Collect heap dump of the Java process")
-  parser.add_argument("--gcore", action="store_true", dest="gcore", default=False,
-      help="Collect the native core dump using gdb. Requires gdb to be installed.")
-  parser.add_argument("--minidumps", action="store", dest="minidumps", type=int,
-      nargs=2, default=[0, 0], metavar=("COUNT", "INTERVAL (in seconds)"),
-      help="Collect breakpad minidumps for the Impala process. Requires --minidumps_dir\
-      be set.")
-  parser.add_argument("--minidumps_dir", action="store", dest="minidumps_dir", default="",
-      help="Path of the directory to which Impala process' minidumps are written")
-  parser.add_argument("--profiles_dir", action="store", dest="profiles_dir", default="",
-      help="Path of the profiles directory to be included in the diagnostics output.")
-  parser.add_argument("--profiles_max_size_limit", action="store",
-      dest="profiles_max_size_limit", default=3*1024*1024*1024,
-      type=float, help="Uncompressed limit (in Bytes) on profile logs collected from\
-      --profiles_dir. Defaults to 3GB.")
-  parser.add_argument("--output_dir", action="store", dest="output_dir",
-      default = tempfile.gettempdir(), help="Output directory that contains the final "
-      "diagnostics data. Defaults to %s" % tempfile.gettempdir())
-  return parser
-
-if __name__ == "__main__":
-  parser = get_args_parser()
-  if len(sys.argv) == 1:
-    parser.print_usage()
-    sys.exit(1)
-  logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, datefmt="%Y-%m-%d %H:%M:%S",
-      format="%(asctime)s %(levelname)-8s %(message)s")
-  diagnostics_handler = ImpalaDiagnosticsHandler(parser.parse_args())
-  logging.info("Running as user: %s" % getpass.getuser())
-  logging.info("Input args: %s" % " ".join(sys.argv))
-  sys.exit(0 if diagnostics_handler.get_diagnostics() else 1)

http://git-wip-us.apache.org/repos/asf/impala/blob/0197b17f/bin/diagnostics/collect_shared_libs.sh
----------------------------------------------------------------------
diff --git a/bin/diagnostics/collect_shared_libs.sh b/bin/diagnostics/collect_shared_libs.sh
deleted file mode 100755
index d5de349..0000000
--- a/bin/diagnostics/collect_shared_libs.sh
+++ /dev/null
@@ -1,52 +0,0 @@
-#!/usr/bin/env bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# $1 - gdb binary path
-# $2 - pid of the Impala process
-# $3 - Output directory to copy the sharedlibs to.
-
-set -euxo pipefail
-
-if [ "$#" -ne 3 ]; then
-  echo "Incorrect usage. Expected: $0 <gdb executable path> <target PID> <output dir>"
-  exit 1
-fi
-
-if [ ! -d $3 ]; then
-  echo "Directory $3 does not exist. This script expects the output directory to exist."
-  exit 1
-fi
-
-# Generate the list of shared libs path to copy.
-shared_libs_to_copy=$(mktemp)
-$1 --pid $2 --batch -ex 'info shared' 2> /dev/null | sed '1,/Shared Object Library/d' |
-    sed 's/\(.*\s\)\(\/.*\)/\2/' | grep \/ > $shared_libs_to_copy
-
-echo "Generated shared library listing for the process."
-
-# Copy the files to the target directory keeping the directory structure intact.
-# We use rsync instead of 'cp --parents' since the latter has permission issues
-# copying from system level directories. https://goo.gl/6yYNhw
-rsync -LR --files-from=$shared_libs_to_copy / $3
-
-echo "Copied the shared libraries to the target directory: $3"
-
-rm -f $shared_libs_to_copy
-# Make sure the impala user has write permissions on all the copied sharedlib paths.
-chmod 755 -R $3

http://git-wip-us.apache.org/repos/asf/impala/blob/0197b17f/bin/rat_exclude_files.txt
----------------------------------------------------------------------
diff --git a/bin/rat_exclude_files.txt b/bin/rat_exclude_files.txt
index 66a699a..5bb13f0 100644
--- a/bin/rat_exclude_files.txt
+++ b/bin/rat_exclude_files.txt
@@ -17,7 +17,6 @@ shell/__init__.py
 ssh_keys/id_rsa_impala
 testdata/__init__.py
 tests/__init__.py
-bin/diagnostics/__init__.py
 www/index.html
 
 # See $IMPALA_HOME/LICENSE.txt

http://git-wip-us.apache.org/repos/asf/impala/blob/0197b17f/tests/unittests/test_command.py
----------------------------------------------------------------------
diff --git a/tests/unittests/test_command.py b/tests/unittests/test_command.py
deleted file mode 100644
index a2a9e4c..0000000
--- a/tests/unittests/test_command.py
+++ /dev/null
@@ -1,49 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-# Unit tests for collect_diagnostics.Command
-
-import os
-import pytest
-import sys
-
-# Update the sys.path to include the modules from bin/diagnostics.
-sys.path.insert(0,
-    os.path.abspath(os.path.join(os.path.dirname(__file__), '../../bin/diagnostics')))
-from collect_diagnostics import Command
-
-class TestCommand(object):
-  """ Unit tests for the Command class"""
-
-  def test_simple_commands(self):
-    # Successful command
-    c = Command(["echo", "foo"], 1000)
-    assert c.run() == 0, "Command expected to succeed, but failed"
-    assert c.stdout.strip("\n") == "foo"
-
-    # Failed command, check return code
-    c = Command(["false"], 1000)
-    assert c.run() == 1
-
-  def test_command_timer(self):
-    # Try to run a command that sleeps for 1000s and set a
-    # timer for 1 second. The command should timed out.
-    c = Command(["sleep", "1000"], 1)
-    assert c.run() != 0, "Command expected to timeout but succeeded."
-    assert c.child_killed_by_timeout, "Command didn't timeout as expected."
-
-


[09/12] impala git commit: Revert "IMPALA-6389: Make '\0' delimited text files work"

Posted by ph...@apache.org.
Revert "IMPALA-6389: Make '\0' delimited text files work"

This reverts commit c2bdaf8af4cf35d3462595c2a341ed84dcf5d960.

An ASAN issue and potentially other problem have been found;
reverting to unbreak the build and tests.

Change-Id: If581311033de8c26e33316b19192c4579594f261
Reviewed-on: http://gerrit.cloudera.org:8080/9851
Reviewed-by: Lars Volker <lv...@cloudera.com>
Tested-by: Zach Amsden <za...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/d3617bcb
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/d3617bcb
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/d3617bcb

Branch: refs/heads/2.x
Commit: d3617bcb28664a20d0c4d2048be2b061056106d7
Parents: f69fbcd
Author: Zach Amsden <za...@cloudera.com>
Authored: Thu Mar 29 03:51:41 2018 +0000
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Mar 29 07:17:25 2018 +0000

----------------------------------------------------------------------
 be/src/exec/delimited-text-parser-test.cc  | 56 ++++---------------
 be/src/exec/delimited-text-parser.cc       | 74 ++++++++-----------------
 be/src/exec/delimited-text-parser.h        | 43 +++++---------
 be/src/exec/delimited-text-parser.inline.h | 70 +++++++++++------------
 be/src/exec/hdfs-sequence-scanner.cc       |  2 +-
 be/src/exec/hdfs-sequence-scanner.h        |  3 +-
 be/src/exec/hdfs-text-scanner.cc           |  2 +-
 be/src/exec/hdfs-text-scanner.h            |  3 +-
 8 files changed, 84 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/d3617bcb/be/src/exec/delimited-text-parser-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/delimited-text-parser-test.cc b/be/src/exec/delimited-text-parser-test.cc
index d8e977d..3156b36 100644
--- a/be/src/exec/delimited-text-parser-test.cc
+++ b/be/src/exec/delimited-text-parser-test.cc
@@ -24,7 +24,7 @@
 
 namespace impala {
 
-void Validate(TupleDelimitedTextParser* parser, const string& data,
+void Validate(DelimitedTextParser* parser, const string& data,
     int expected_offset, char tuple_delim, int expected_num_tuples,
     int expected_num_fields) {
   parser->ParserReset();
@@ -72,8 +72,8 @@ TEST(DelimitedTextParser, Basic) {
   bool is_materialized_col[NUM_COLS];
   for (int i = 0; i < NUM_COLS; ++i) is_materialized_col[i] = true;
 
-  TupleDelimitedTextParser no_escape_parser(NUM_COLS, 0, is_materialized_col,
-                                            TUPLE_DELIM, FIELD_DELIM, COLLECTION_DELIM);
+  DelimitedTextParser no_escape_parser(NUM_COLS, 0, is_materialized_col,
+                                       TUPLE_DELIM, FIELD_DELIM, COLLECTION_DELIM);
   // Note that only complete tuples "count"
   Validate(&no_escape_parser, "no_delims", -1, TUPLE_DELIM, 0, 0);
   Validate(&no_escape_parser, "abc||abc", 4, TUPLE_DELIM, 1, 1);
@@ -81,9 +81,9 @@ TEST(DelimitedTextParser, Basic) {
   Validate(&no_escape_parser, "a|bcd", 2, TUPLE_DELIM, 0, 0);
 
   // Test with escape char
-  TupleDelimitedTextParser escape_parser(NUM_COLS, 0, is_materialized_col,
-                                         TUPLE_DELIM, FIELD_DELIM, COLLECTION_DELIM,
-                                         ESCAPE_CHAR);
+  DelimitedTextParser escape_parser(NUM_COLS, 0, is_materialized_col,
+                                    TUPLE_DELIM, FIELD_DELIM, COLLECTION_DELIM,
+                                    ESCAPE_CHAR);
   Validate(&escape_parser, "a@|a|bcd", 5, TUPLE_DELIM, 0, 0);
   Validate(&escape_parser, "a@@|a|bcd", 4, TUPLE_DELIM, 1, 1);
   Validate(&escape_parser, "a@@@|a|bcd", 7, TUPLE_DELIM, 0, 0);
@@ -127,8 +127,8 @@ TEST(DelimitedTextParser, Fields) {
   bool is_materialized_col[NUM_COLS];
   for (int i = 0; i < NUM_COLS; ++i) is_materialized_col[i] = true;
 
-  TupleDelimitedTextParser no_escape_parser(NUM_COLS, 0, is_materialized_col,
-                                            TUPLE_DELIM, FIELD_DELIM, COLLECTION_DELIM);
+  DelimitedTextParser no_escape_parser(NUM_COLS, 0, is_materialized_col,
+                                       TUPLE_DELIM, FIELD_DELIM, COLLECTION_DELIM);
 
   Validate(&no_escape_parser, "a,b|c,d|e,f", 4, TUPLE_DELIM, 1, 3);
   Validate(&no_escape_parser, "b|c,d|e,f", 2, TUPLE_DELIM, 1, 3);
@@ -137,9 +137,9 @@ TEST(DelimitedTextParser, Fields) {
   const string str10("a,\0|c,d|e", 9);
   Validate(&no_escape_parser, str10, 4, TUPLE_DELIM, 1, 2);
 
-  TupleDelimitedTextParser escape_parser(NUM_COLS, 0, is_materialized_col,
-                                         TUPLE_DELIM, FIELD_DELIM, COLLECTION_DELIM,
-                                         ESCAPE_CHAR);
+  DelimitedTextParser escape_parser(NUM_COLS, 0, is_materialized_col,
+                                    TUPLE_DELIM, FIELD_DELIM, COLLECTION_DELIM,
+                                    ESCAPE_CHAR);
 
   Validate(&escape_parser, "a,b|c,d|e,f", 4, TUPLE_DELIM, 1, 3);
   Validate(&escape_parser, "a,@|c|e,f", 6, TUPLE_DELIM, 0, 1);
@@ -148,20 +148,14 @@ TEST(DelimitedTextParser, Fields) {
 
 TEST(DelimitedTextParser, SpecialDelimiters) {
   const char TUPLE_DELIM = '\n'; // implies '\r' and "\r\n" are also delimiters
-  const char NUL_DELIM = '\0';
   const int NUM_COLS = 1;
 
   bool is_materialized_col[NUM_COLS];
   for (int i = 0; i < NUM_COLS; ++i) is_materialized_col[i] = true;
 
-  TupleDelimitedTextParser tuple_delim_parser(NUM_COLS, 0, is_materialized_col,
+  DelimitedTextParser tuple_delim_parser(NUM_COLS, 0, is_materialized_col,
       TUPLE_DELIM);
 
-  TupleDelimitedTextParser nul_delim_parser(NUM_COLS, 0, is_materialized_col, NUL_DELIM);
-
-  TupleDelimitedTextParser nul_field_parser(2, 0, is_materialized_col,
-                                            TUPLE_DELIM, NUL_DELIM);
-
   // Non-SSE case
   Validate(&tuple_delim_parser, "A\r\nB", 3, TUPLE_DELIM, 0, 0);
   Validate(&tuple_delim_parser, "A\rB", 2, TUPLE_DELIM, 0, 0);
@@ -171,16 +165,6 @@ TEST(DelimitedTextParser, SpecialDelimiters) {
   Validate(&tuple_delim_parser, "A\rB\nC\r\nD", 2, TUPLE_DELIM, 2, 2);
   Validate(&tuple_delim_parser, "\r\r\n\n", 1, TUPLE_DELIM, 2, 2);
 
-  // NUL tuple delimiter; no field delimiter
-  const string nul1("\0\0\0", 3);
-  const string nul2("AAA\0BBB\0", 8);
-  const string nul3("\n\0\r\0\r\n\0", 7);
-  const string nul4("\n\0\r\0\r\n", 6);
-  Validate(&nul_delim_parser, nul1, 1, NUL_DELIM, 2, 2);
-  Validate(&nul_delim_parser, nul2, 4, NUL_DELIM, 1, 1);
-  Validate(&nul_delim_parser, nul3, 2, NUL_DELIM, 2, 2);
-  Validate(&nul_delim_parser, nul4, 2, NUL_DELIM, 1, 1);
-
   // SSE case
   string data = "\rAAAAAAAAAAAAAAA";
   DCHECK_EQ(data.size(), SSEUtil::CHARS_PER_128_BIT_REGISTER);
@@ -194,22 +178,6 @@ TEST(DelimitedTextParser, SpecialDelimiters) {
   data = "\r\nAAA\n\r\r\nAAAAAAA";
   DCHECK_EQ(data.size(), SSEUtil::CHARS_PER_128_BIT_REGISTER);
   Validate(&tuple_delim_parser, data, 2, TUPLE_DELIM, 3, 3);
-
-  // NUL SSE case
-  const string nulsse1("AAAAA\0AAAAAAAAAAA\0AAAAAAAAAAAA\0\0", 32);
-  const string nulsse2("AAAAA\0AAAAAAAAAAA\0AAAAAAAAAAAA\0A", 32);
-  const string nulsse3("AAA\0BBBbbbbbbbbbbbbbbbbbbb\0cccc,ddd\0", 36);
-  const string nulsse4("AAA\0BBBbbbbbbbbbbbbbbbbbbb\0cccc,dddd", 36);
-  Validate(&nul_delim_parser, nulsse1, 6, NUL_DELIM, 3, 3);
-  Validate(&nul_delim_parser, nulsse2, 6, NUL_DELIM, 2, 2);
-  Validate(&nul_delim_parser, nulsse3, 4, NUL_DELIM, 2, 2);
-  Validate(&nul_delim_parser, nulsse4, 4, NUL_DELIM, 1, 1);
-
-  // NUL Field delimiters
-  const string field1("\na\0b\0c\n", 7);
-  const string field2("aaaa\na\0b\0c\naaaaa\0b\na\0b\0c\n", 25);
-  Validate(&nul_field_parser, field1, 1, TUPLE_DELIM, 1, 2);
-  Validate(&nul_field_parser, field2, 5, TUPLE_DELIM, 3, 6);
 }
 
 // TODO: expand test for other delimited text parser functions/cases.

http://git-wip-us.apache.org/repos/asf/impala/blob/d3617bcb/be/src/exec/delimited-text-parser.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/delimited-text-parser.cc b/be/src/exec/delimited-text-parser.cc
index 7db65fd..18fcde1 100644
--- a/be/src/exec/delimited-text-parser.cc
+++ b/be/src/exec/delimited-text-parser.cc
@@ -24,8 +24,7 @@
 
 using namespace impala;
 
-template<bool DELIMITED_TUPLES>
-DelimitedTextParser<DELIMITED_TUPLES>::DelimitedTextParser(
+DelimitedTextParser::DelimitedTextParser(
     int num_cols, int num_partition_keys, const bool* is_materialized_col,
     char tuple_delim, char field_delim, char collection_item_delim, char escape_char)
     : is_materialized_col_(is_materialized_col),
@@ -73,7 +72,7 @@ DelimitedTextParser<DELIMITED_TUPLES>::DelimitedTextParser(
     memset(low_mask_, 0, sizeof(low_mask_));
   }
 
-  if (DELIMITED_TUPLES) {
+  if (tuple_delim != '\0') {
     search_chars[num_delims_++] = tuple_delim_;
     ++num_tuple_delims_;
     // Hive will treats \r (^M) as an alternate tuple delimiter, but \r\n is a
@@ -83,12 +82,12 @@ DelimitedTextParser<DELIMITED_TUPLES>::DelimitedTextParser(
       ++num_tuple_delims_;
     }
     xmm_tuple_search_ = _mm_loadu_si128(reinterpret_cast<__m128i*>(search_chars));
-    if (field_delim_ != tuple_delim_) search_chars[num_delims_++] = field_delim_;
-  } else {
-    search_chars[num_delims_++] = field_delim_;
   }
 
-  if (collection_item_delim != '\0') search_chars[num_delims_++] = collection_item_delim_;
+  if (field_delim != '\0' || collection_item_delim != '\0') {
+    search_chars[num_delims_++] = field_delim_;
+    search_chars[num_delims_++] = collection_item_delim_;
+  }
 
   DCHECK_GT(num_delims_, 0);
   xmm_delim_search_ = _mm_loadu_si128(reinterpret_cast<__m128i*>(search_chars));
@@ -96,30 +95,16 @@ DelimitedTextParser<DELIMITED_TUPLES>::DelimitedTextParser(
   ParserReset();
 }
 
-template
-DelimitedTextParser<true>::DelimitedTextParser(
-    int num_cols, int num_partition_keys, const bool* is_materialized_col,
-    char tuple_delim, char field_delim, char collection_item_delim, char escape_char);
-
-template
-DelimitedTextParser<false>::DelimitedTextParser(
-    int num_cols, int num_partition_keys, const bool* is_materialized_col,
-    char tuple_delim, char field_delim, char collection_item_delim, char escape_char);
-
-template<bool DELIMITED_TUPLES>
-void DelimitedTextParser<DELIMITED_TUPLES>::ParserReset() {
+void DelimitedTextParser::ParserReset() {
   current_column_has_escape_ = false;
   last_char_is_escape_ = false;
   last_row_delim_offset_ = -1;
   column_idx_ = num_partition_keys_;
 }
 
-template void DelimitedTextParser<true>::ParserReset();
-
 // Parsing raw csv data into FieldLocation descriptors.
-template<bool DELIMITED_TUPLES>
-Status DelimitedTextParser<DELIMITED_TUPLES>::ParseFieldLocations(int max_tuples,
-    int64_t remaining_len, char** byte_buffer_ptr, char** row_end_locations,
+Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remaining_len,
+    char** byte_buffer_ptr, char** row_end_locations,
     FieldLocation* field_locations,
     int* num_tuples, int* num_fields, char** next_column_start) {
   // Start of this batch.
@@ -148,10 +133,10 @@ Status DelimitedTextParser<DELIMITED_TUPLES>::ParseFieldLocations(int max_tuples
   while (remaining_len > 0) {
     bool new_tuple = false;
     bool new_col = false;
-    if (DELIMITED_TUPLES) unfinished_tuple_ = true;
+    unfinished_tuple_ = true;
 
     if (!last_char_is_escape_) {
-      if (DELIMITED_TUPLES && (**byte_buffer_ptr == tuple_delim_ ||
+      if (tuple_delim_ != '\0' && (**byte_buffer_ptr == tuple_delim_ ||
            (tuple_delim_ == '\n' && **byte_buffer_ptr == '\r'))) {
         new_tuple = true;
         new_col = true;
@@ -181,7 +166,6 @@ Status DelimitedTextParser<DELIMITED_TUPLES>::ParseFieldLocations(int max_tuples
         row_end_locations[*num_tuples] = *byte_buffer_ptr;
         ++(*num_tuples);
       }
-      DCHECK(DELIMITED_TUPLES);
       unfinished_tuple_ = false;
       last_row_delim_offset_ = **byte_buffer_ptr == '\r' ? remaining_len - 1 : -1;
       if (*num_tuples == max_tuples) {
@@ -201,7 +185,7 @@ Status DelimitedTextParser<DELIMITED_TUPLES>::ParseFieldLocations(int max_tuples
 
   // For formats that store the length of the row, the row is not delimited:
   // e.g. Sequence files.
-  if (!DELIMITED_TUPLES) {
+  if (tuple_delim_ == '\0') {
     DCHECK_EQ(remaining_len, 0);
     RETURN_IF_ERROR(AddColumn<true>(*byte_buffer_ptr - *next_column_start,
         next_column_start, num_fields, field_locations));
@@ -209,30 +193,18 @@ Status DelimitedTextParser<DELIMITED_TUPLES>::ParseFieldLocations(int max_tuples
     DCHECK(status.ok());
     column_idx_ = num_partition_keys_;
     ++(*num_tuples);
+    unfinished_tuple_ = false;
   }
   return Status::OK();
 }
 
-template
-Status DelimitedTextParser<true>::ParseFieldLocations(int max_tuples,
-    int64_t remaining_len, char** byte_buffer_ptr, char** row_end_locations,
-    FieldLocation* field_locations,
-    int* num_tuples, int* num_fields, char** next_column_start);
-
-template
-Status DelimitedTextParser<false>::ParseFieldLocations(int max_tuples,
-    int64_t remaining_len, char** byte_buffer_ptr, char** row_end_locations,
-    FieldLocation* field_locations,
-    int* num_tuples, int* num_fields, char** next_column_start);
-
-template<bool DELIMITED_TUPLES>
-int64_t DelimitedTextParser<DELIMITED_TUPLES>::FindFirstInstance(const char* buffer,
-    int64_t len) {
+// Find the first instance of the tuple delimiter. This will find the start of the first
+// full tuple in buffer by looking for the end of the previous tuple.
+int64_t DelimitedTextParser::FindFirstInstance(const char* buffer, int64_t len) {
   int64_t tuple_start = 0;
   const char* buffer_start = buffer;
   bool found = false;
 
-  DCHECK(DELIMITED_TUPLES);
   // If the last char in the previous buffer was \r then either return the start of
   // this buffer or skip a \n at the beginning of the buffer.
   if (last_row_delim_offset_ != -1) {
@@ -254,10 +226,13 @@ restart:
       int tuple_mask = _mm_extract_epi16(xmm_tuple_mask, 0);
       if (tuple_mask != 0) {
         found = true;
-        // Find first set bit (1-based)
-        int i = ffs(tuple_mask);
-        tuple_start += i;
-        buffer += i;
+        for (int i = 0; i < SSEUtil::CHARS_PER_128_BIT_REGISTER; ++i) {
+          if ((tuple_mask & SSEUtil::SSE_BITMASK[i]) != 0) {
+            tuple_start += i + 1;
+            buffer += i + 1;
+            break;
+          }
+        }
         break;
       }
       tuple_start += SSEUtil::CHARS_PER_128_BIT_REGISTER;
@@ -320,6 +295,3 @@ restart:
   }
   return tuple_start;
 }
-
-template
-int64_t DelimitedTextParser<true>::FindFirstInstance(const char* buffer, int64_t len);

http://git-wip-us.apache.org/repos/asf/impala/blob/d3617bcb/be/src/exec/delimited-text-parser.h
----------------------------------------------------------------------
diff --git a/be/src/exec/delimited-text-parser.h b/be/src/exec/delimited-text-parser.h
index 9b89127..b966081 100644
--- a/be/src/exec/delimited-text-parser.h
+++ b/be/src/exec/delimited-text-parser.h
@@ -25,27 +25,22 @@
 
 namespace impala {
 
-template <bool DELIMITED_TUPLES>
 class DelimitedTextParser {
  public:
 
   /// The Delimited Text Parser parses text rows that are delimited by specific
   /// characters:
-  ///   tuple_delim: delimits tuples.  Only used if DELIMITED_TUPLES is true.
+  ///   tuple_delim: delimits tuples
   ///   field_delim: delimits fields
   ///   collection_item_delim: delimits collection items
   ///   escape_char: escape delimiters, make them part of the data.
-  ///
-  /// If the template parameter DELIMITED_TUPLES is false there is no support
-  /// for tuple delimiters and we do not need to search for them.  Any value
-  /// may be passed for tuple_delim, as it is ignored.
-  ///
+  //
   /// 'num_cols' is the total number of columns including partition keys.
-  ///
+  //
   /// 'is_materialized_col' should be initialized to an array of length 'num_cols', with
   /// is_materialized_col[i] = <true if column i should be materialized, false otherwise>
   /// Owned by caller.
-  ///
+  //
   /// The main method is ParseData which fills in a vector of pointers and lengths to the
   /// fields.  It also can handle an escape character which masks a tuple or field
   /// delimiter that occurs in the data.
@@ -96,14 +91,14 @@ class DelimitedTextParser {
   /// This function is used to parse sequence file records which do not need to
   /// parse for tuple delimiters. Returns an error status if any column exceeds the
   /// size limit. See AddColumn() for details.
-  /// This function is disabled for non-sequence file parsing.
-  template <bool PROCESS_ESCAPES>
+  template <bool process_escapes>
   Status ParseSingleTuple(int64_t len, char* buffer, FieldLocation* field_locations,
       int* num_fields);
 
   /// FindFirstInstance returns the position after the first non-escaped tuple
   /// delimiter from the starting offset.
   /// Used to find the start of a tuple if jumping into the middle of a text file.
+  /// Also used to find the sync marker for Sequenced and RC files.
   /// If no tuple delimiter is found within the buffer, return -1;
   int64_t FindFirstInstance(const char* buffer, int64_t len);
 
@@ -124,16 +119,13 @@ class DelimitedTextParser {
   /// by the number fields added.
   /// 'field_locations' will be updated with the start and length of the fields.
   /// Returns an error status if 'len' exceeds the size limit specified in AddColumn().
-  template <bool PROCESS_ESCAPES>
+  template <bool process_escapes>
   Status FillColumns(int64_t len, char** last_column, int* num_fields,
       impala::FieldLocation* field_locations);
 
   /// Return true if we have not seen a tuple delimiter for the current tuple being
   /// parsed (i.e., the last byte read was not a tuple delimiter).
-  bool HasUnfinishedTuple() {
-    DCHECK(DELIMITED_TUPLES);
-    return unfinished_tuple_;
-  }
+  bool HasUnfinishedTuple() { return unfinished_tuple_; }
 
  private:
   /// Initialize the parser state.
@@ -141,7 +133,7 @@ class DelimitedTextParser {
 
   /// Helper routine to add a column to the field_locations vector.
   /// Template parameter:
-  ///   PROCESS_ESCAPES -- if true the the column may have escape characters
+  ///   process_escapes -- if true the the column may have escape characters
   ///                      and the negative of the len will be stored.
   ///   len: length of the current column. The length of a column must fit in a 32-bit
   ///        signed integer (i.e. <= 2147483647 bytes). If a column is larger than that,
@@ -152,29 +144,23 @@ class DelimitedTextParser {
   /// Output:
   ///   field_locations: updated with start and length of current field.
   /// Return an error status if 'len' exceeds the size limit specified above.
-  template <bool PROCESS_ESCAPES>
+  template <bool process_escapes>
   Status AddColumn(int64_t len, char** next_column_start, int* num_fields,
       FieldLocation* field_locations);
 
   /// Helper routine to parse delimited text using SSE instructions.
   /// Identical arguments as ParseFieldLocations.
-  /// If the template argument, 'PROCESS_ESCAPES' is true, this function will handle
+  /// If the template argument, 'process_escapes' is true, this function will handle
   /// escapes, otherwise, it will assume the text is unescaped.  By using templates,
   /// we can special case the un-escaped path for better performance.  The unescaped
   /// path is optimized away by the compiler. Returns an error status if the length
   /// of any column exceeds the size limit. See AddColumn() for details.
-  template <bool PROCESS_ESCAPES>
+  template <bool process_escapes>
   Status ParseSse(int max_tuples, int64_t* remaining_len,
       char** byte_buffer_ptr, char** row_end_locations_,
       FieldLocation* field_locations,
       int* num_tuples, int* num_fields, char** next_column_start);
 
-  bool IsFieldOrCollectionItemDelimiter(char c) {
-    return (!DELIMITED_TUPLES && c == field_delim_) ||
-      (DELIMITED_TUPLES && field_delim_ != tuple_delim_ && c == field_delim_) ||
-      (collection_item_delim_ != '\0' && c == collection_item_delim_);
-  }
-
   /// SSE(xmm) register containing the tuple search character(s).
   __m128i xmm_tuple_search_;
 
@@ -228,7 +214,7 @@ class DelimitedTextParser {
   /// Character delimiting collection items (to become slots).
   char collection_item_delim_;
 
-  /// Character delimiting tuples.  Only used if DELIMITED_TUPLES is true.
+  /// Character delimiting tuples.
   char tuple_delim_;
 
   /// Whether or not the current column has an escape character in it
@@ -242,8 +228,5 @@ class DelimitedTextParser {
   bool unfinished_tuple_;
 };
 
-using TupleDelimitedTextParser = DelimitedTextParser<true>;
-using SequenceDelimitedTextParser = DelimitedTextParser<false>;
-
 }// namespace impala
 #endif// IMPALA_EXEC_DELIMITED_TEXT_PARSER_H

http://git-wip-us.apache.org/repos/asf/impala/blob/d3617bcb/be/src/exec/delimited-text-parser.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/delimited-text-parser.inline.h b/be/src/exec/delimited-text-parser.inline.h
index 9fe737e..02fa132 100644
--- a/be/src/exec/delimited-text-parser.inline.h
+++ b/be/src/exec/delimited-text-parser.inline.h
@@ -52,10 +52,9 @@ inline void ProcessEscapeMask(uint16_t escape_mask, bool* last_char_is_escape,
   *delim_mask &= ~escape_mask;
 }
 
-template <bool DELIMITED_TUPLES>
-template <bool PROCESS_ESCAPES>
-inline Status DelimitedTextParser<DELIMITED_TUPLES>::AddColumn(int64_t len,
-    char** next_column_start, int* num_fields, FieldLocation* field_locations) {
+template <bool process_escapes>
+inline Status DelimitedTextParser::AddColumn(int64_t len, char** next_column_start,
+    int* num_fields, FieldLocation* field_locations) {
   if (UNLIKELY(!BitUtil::IsNonNegative32Bit(len))) {
     return Status(TErrorCode::TEXT_PARSER_TRUNCATED_COLUMN, len);
   }
@@ -63,27 +62,26 @@ inline Status DelimitedTextParser<DELIMITED_TUPLES>::AddColumn(int64_t len,
     // Found a column that needs to be parsed, write the start/len to 'field_locations'
     field_locations[*num_fields].start = *next_column_start;
     int64_t field_len = len;
-    if (PROCESS_ESCAPES && current_column_has_escape_) {
+    if (process_escapes && current_column_has_escape_) {
       field_len = -len;
     }
     field_locations[*num_fields].len = static_cast<int32_t>(field_len);
     ++(*num_fields);
   }
-  if (PROCESS_ESCAPES) current_column_has_escape_ = false;
+  if (process_escapes) current_column_has_escape_ = false;
   *next_column_start += len + 1;
   ++column_idx_;
   return Status::OK();
 }
 
-template <bool DELIMITED_TUPLES>
-template <bool PROCESS_ESCAPES>
-inline Status DelimitedTextParser<DELIMITED_TUPLES>::FillColumns(int64_t len,
-    char** last_column, int* num_fields, FieldLocation* field_locations) {
+template <bool process_escapes>
+inline Status DelimitedTextParser::FillColumns(int64_t len, char** last_column,
+    int* num_fields, FieldLocation* field_locations) {
   // Fill in any columns missing from the end of the tuple.
   char* dummy = NULL;
   if (last_column == NULL) last_column = &dummy;
   while (column_idx_ < num_cols_) {
-    RETURN_IF_ERROR(AddColumn<PROCESS_ESCAPES>(len, last_column,
+    RETURN_IF_ERROR(AddColumn<process_escapes>(len, last_column,
         num_fields, field_locations));
     // The rest of the columns will be null.
     last_column = &dummy;
@@ -105,9 +103,8 @@ inline Status DelimitedTextParser<DELIMITED_TUPLES>::FillColumns(int64_t len,
 ///  Needle   = 'abcd000000000000' (we're searching for any a's, b's, c's or d's)
 ///  Haystack = 'asdfghjklhjbdwwc' (the raw string)
 ///  Result   = '1010000000011001'
-template <bool DELIMITED_TUPLES>
-template <bool PROCESS_ESCAPES>
-inline Status DelimitedTextParser<DELIMITED_TUPLES>::ParseSse(int max_tuples,
+template <bool process_escapes>
+inline Status DelimitedTextParser::ParseSse(int max_tuples,
     int64_t* remaining_len, char** byte_buffer_ptr,
     char** row_end_locations, FieldLocation* field_locations,
     int* num_tuples, int* num_fields, char** next_column_start) {
@@ -149,7 +146,7 @@ inline Status DelimitedTextParser<DELIMITED_TUPLES>::ParseSse(int max_tuples,
 
     uint16_t escape_mask = 0;
     // If the table does not use escape characters, skip processing for it.
-    if (PROCESS_ESCAPES) {
+    if (process_escapes) {
       DCHECK(escape_char_ != '\0');
       xmm_escape_mask = SSE4_cmpestrm<SSEUtil::STRCHR_MODE>(xmm_escape_search_, 1,
           xmm_buffer, SSEUtil::CHARS_PER_128_BIT_REGISTER);
@@ -159,10 +156,8 @@ inline Status DelimitedTextParser<DELIMITED_TUPLES>::ParseSse(int max_tuples,
 
     char* last_char = *byte_buffer_ptr + 15;
     bool last_char_is_unescaped_delim = delim_mask >> 15;
-    if (DELIMITED_TUPLES) {
-      unfinished_tuple_ = !(last_char_is_unescaped_delim &&
-          (*last_char == tuple_delim_ || (tuple_delim_ == '\n' && *last_char == '\r')));
-    }
+    unfinished_tuple_ = !(last_char_is_unescaped_delim &&
+        (*last_char == tuple_delim_ || (tuple_delim_ == '\n' && *last_char == '\r')));
 
     int last_col_idx = 0;
     // Process all non-zero bits in the delim_mask from lsb->msb.  If a bit
@@ -175,7 +170,7 @@ inline Status DelimitedTextParser<DELIMITED_TUPLES>::ParseSse(int max_tuples,
       // clear current bit
       delim_mask &= ~(SSEUtil::SSE_BITMASK[n]);
 
-      if (PROCESS_ESCAPES) {
+      if (process_escapes) {
         // Determine if there was an escape character between [last_col_idx, n]
         bool escaped = (escape_mask & low_mask_[last_col_idx] & high_mask_[n]) != 0;
         current_column_has_escape_ |= escaped;
@@ -184,14 +179,13 @@ inline Status DelimitedTextParser<DELIMITED_TUPLES>::ParseSse(int max_tuples,
 
       char* delim_ptr = *byte_buffer_ptr + n;
 
-      if (IsFieldOrCollectionItemDelimiter(*delim_ptr)) {
-        RETURN_IF_ERROR(AddColumn<PROCESS_ESCAPES>(delim_ptr - *next_column_start,
+      if (*delim_ptr == field_delim_ || *delim_ptr == collection_item_delim_) {
+        RETURN_IF_ERROR(AddColumn<process_escapes>(delim_ptr - *next_column_start,
             next_column_start, num_fields, field_locations));
         continue;
       }
 
-      if (DELIMITED_TUPLES &&
-          (*delim_ptr == tuple_delim_ || (tuple_delim_ == '\n' && *delim_ptr == '\r'))) {
+      if (*delim_ptr == tuple_delim_ || (tuple_delim_ == '\n' && *delim_ptr == '\r')) {
         if (UNLIKELY(
                 last_row_delim_offset_ == *remaining_len - n && *delim_ptr == '\n')) {
           // If the row ended in \r\n then move the next start past the \n
@@ -199,7 +193,7 @@ inline Status DelimitedTextParser<DELIMITED_TUPLES>::ParseSse(int max_tuples,
           last_row_delim_offset_ = -1;
           continue;
         }
-        RETURN_IF_ERROR(AddColumn<PROCESS_ESCAPES>(delim_ptr - *next_column_start,
+        RETURN_IF_ERROR(AddColumn<process_escapes>(delim_ptr - *next_column_start,
             next_column_start, num_fields, field_locations));
         Status status = FillColumns<false>(0, NULL, num_fields, field_locations);
         DCHECK(status.ok());
@@ -210,7 +204,7 @@ inline Status DelimitedTextParser<DELIMITED_TUPLES>::ParseSse(int max_tuples,
         last_row_delim_offset_ = *delim_ptr == '\r' ? *remaining_len - n - 1 : -1;
         if (UNLIKELY(*num_tuples == max_tuples)) {
           (*byte_buffer_ptr) += (n + 1);
-          if (PROCESS_ESCAPES) last_char_is_escape_ = false;
+          if (process_escapes) last_char_is_escape_ = false;
           *remaining_len -= (n + 1);
           // If the last character we processed was \r then set the offset to 0
           // so that we will use it at the beginning of the next batch.
@@ -220,7 +214,7 @@ inline Status DelimitedTextParser<DELIMITED_TUPLES>::ParseSse(int max_tuples,
       }
     }
 
-    if (PROCESS_ESCAPES) {
+    if (process_escapes) {
       // Determine if there was an escape character between (last_col_idx, 15)
       bool unprocessed_escape = escape_mask & low_mask_[last_col_idx] & high_mask_[15];
       current_column_has_escape_ |= unprocessed_escape;
@@ -233,10 +227,9 @@ inline Status DelimitedTextParser<DELIMITED_TUPLES>::ParseSse(int max_tuples,
 }
 
 /// Simplified version of ParseSSE which does not handle tuple delimiters.
-template<>
-template <bool PROCESS_ESCAPES>
-inline Status DelimitedTextParser<false>::ParseSingleTuple(int64_t remaining_len,
-    char* buffer, FieldLocation* field_locations, int* num_fields) {
+template <bool process_escapes>
+inline Status DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char* buffer,
+    FieldLocation* field_locations, int* num_fields) {
   char* next_column_start = buffer;
   __m128i xmm_buffer, xmm_delim_mask, xmm_escape_mask;
 
@@ -253,7 +246,7 @@ inline Status DelimitedTextParser<false>::ParseSingleTuple(int64_t remaining_len
 
       uint16_t escape_mask = 0;
       // If the table does not use escape characters, skip processing for it.
-      if (PROCESS_ESCAPES) {
+      if (process_escapes) {
         DCHECK(escape_char_ != '\0');
         xmm_escape_mask = SSE4_cmpestrm<SSEUtil::STRCHR_MODE>(xmm_escape_search_, 1,
             xmm_buffer, SSEUtil::CHARS_PER_128_BIT_REGISTER);
@@ -270,7 +263,7 @@ inline Status DelimitedTextParser<false>::ParseSingleTuple(int64_t remaining_len
         DCHECK_GE(n, 0);
         DCHECK_LT(n, 16);
 
-        if (PROCESS_ESCAPES) {
+        if (process_escapes) {
           // Determine if there was an escape character between [last_col_idx, n]
           bool escaped = (escape_mask & low_mask_[last_col_idx] & high_mask_[n]) != 0;
           current_column_has_escape_ |= escaped;
@@ -280,11 +273,11 @@ inline Status DelimitedTextParser<false>::ParseSingleTuple(int64_t remaining_len
         // clear current bit
         delim_mask &= ~(SSEUtil::SSE_BITMASK[n]);
 
-        RETURN_IF_ERROR(AddColumn<PROCESS_ESCAPES>(buffer + n - next_column_start,
+        RETURN_IF_ERROR(AddColumn<process_escapes>(buffer + n - next_column_start,
             &next_column_start, num_fields, field_locations));
       }
 
-      if (PROCESS_ESCAPES) {
+      if (process_escapes) {
         // Determine if there was an escape character between (last_col_idx, 15)
         bool unprocessed_escape = escape_mask & low_mask_[last_col_idx] & high_mask_[15];
         current_column_has_escape_ |= unprocessed_escape;
@@ -303,8 +296,9 @@ inline Status DelimitedTextParser<false>::ParseSingleTuple(int64_t remaining_len
       last_char_is_escape_ = false;
     }
 
-    if (!last_char_is_escape_ && IsFieldOrCollectionItemDelimiter(*buffer)) {
-      RETURN_IF_ERROR(AddColumn<PROCESS_ESCAPES>(buffer - next_column_start,
+    if (!last_char_is_escape_ &&
+          (*buffer == field_delim_ || *buffer == collection_item_delim_)) {
+      RETURN_IF_ERROR(AddColumn<process_escapes>(buffer - next_column_start,
           &next_column_start, num_fields, field_locations));
     }
 
@@ -314,7 +308,7 @@ inline Status DelimitedTextParser<false>::ParseSingleTuple(int64_t remaining_len
 
   // Last column does not have a delimiter after it.  Add that column and also
   // pad with empty cols if the input is ragged.
-  return FillColumns<PROCESS_ESCAPES>(buffer - next_column_start,
+  return FillColumns<process_escapes>(buffer - next_column_start,
       &next_column_start, num_fields, field_locations);
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/d3617bcb/be/src/exec/hdfs-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc
index 8a9151e..346a18a 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -73,7 +73,7 @@ Status HdfsSequenceScanner::InitNewRange() {
   text_converter_.reset(new TextConverter(hdfs_partition->escape_char(),
       scan_node_->hdfs_table()->null_column_value()));
 
-  delimited_text_parser_.reset(new SequenceDelimitedTextParser(
+  delimited_text_parser_.reset(new DelimitedTextParser(
       scan_node_->hdfs_table()->num_cols(), scan_node_->num_partition_keys(),
       scan_node_->is_materialized_col(), '\0', hdfs_partition->field_delim(),
       hdfs_partition->collection_delim(), hdfs_partition->escape_char()));

http://git-wip-us.apache.org/repos/asf/impala/blob/d3617bcb/be/src/exec/hdfs-sequence-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.h b/be/src/exec/hdfs-sequence-scanner.h
index 463ffc7..4845edb 100644
--- a/be/src/exec/hdfs-sequence-scanner.h
+++ b/be/src/exec/hdfs-sequence-scanner.h
@@ -153,7 +153,6 @@
 
 namespace impala {
 
-template <bool>
 class DelimitedTextParser;
 
 class HdfsSequenceScanner : public BaseSequenceScanner {
@@ -223,7 +222,7 @@ class HdfsSequenceScanner : public BaseSequenceScanner {
   Status GetRecord(uint8_t** record_ptr, int64_t* record_len) WARN_UNUSED_RESULT;
 
   /// Helper class for picking fields and rows from delimited text.
-  boost::scoped_ptr<DelimitedTextParser<false>> delimited_text_parser_;
+  boost::scoped_ptr<DelimitedTextParser> delimited_text_parser_;
   std::vector<FieldLocation> field_locations_;
 
   /// Data that is fixed across headers.  This struct is shared between scan ranges.

http://git-wip-us.apache.org/repos/asf/impala/blob/d3617bcb/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index b78115d..253bcc8 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -203,7 +203,7 @@ Status HdfsTextScanner::InitNewRange() {
     collection_delim = '\0';
   }
 
-  delimited_text_parser_.reset(new TupleDelimitedTextParser(
+  delimited_text_parser_.reset(new DelimitedTextParser(
       scan_node_->hdfs_table()->num_cols(), scan_node_->num_partition_keys(),
       scan_node_->is_materialized_col(), hdfs_partition->line_delim(),
       field_delim, collection_delim, hdfs_partition->escape_char()));

http://git-wip-us.apache.org/repos/asf/impala/blob/d3617bcb/be/src/exec/hdfs-text-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index 25886ba..610c612 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -25,7 +25,6 @@
 
 namespace impala {
 
-template<bool>
 class DelimitedTextParser;
 class ScannerContext;
 struct HdfsFileDesc;
@@ -238,7 +237,7 @@ class HdfsTextScanner : public HdfsScanner {
   int slot_idx_;
 
   /// Helper class for picking fields and rows from delimited text.
-  boost::scoped_ptr<DelimitedTextParser<true>> delimited_text_parser_;
+  boost::scoped_ptr<DelimitedTextParser> delimited_text_parser_;
 
   /// Return field locations from the Delimited Text Parser.
   std::vector<FieldLocation> field_locations_;


[02/12] impala git commit: IMPALA-6685: Improve profiles in KrpcDataStreamRecvr and KrpcDataStreamSender

Posted by ph...@apache.org.
IMPALA-6685: Improve profiles in KrpcDataStreamRecvr and KrpcDataStreamSender

This change implements a couple of improvements to the profiles of
KrpcDataStreamRecvr and KrpcDataStreamSender:

- track pending number of deferred row batches over time in KrpcDataStreamRecvr
- track the number of bytes dequeued over time in KrpcDataStreamRecvr
- track the total time deferred RPCs queues are not empty
- track the number of bytes sent from KrpcDataStreamSender over time
- track the total amount of time spent in KrpcDataStreamSender, including time
  spent waiting for RPC completion.

Sample profile of an Exchange node instance:

          EXCHANGE_NODE (id=21):(Total: 2s284ms, non-child: 64.926ms, % non-child: 2.84%)
             - ConvertRowBatchTime: 44.380ms
             - PeakMemoryUsage: 124.04 KB (127021)
             - RowsReturned: 287.51K (287514)
             - RowsReturnedRate: 125.88 K/sec
            Buffer pool:
               - AllocTime: 1.109ms
               - CumulativeAllocationBytes: 10.96 MB (11493376)
               - CumulativeAllocations: 562 (562)
               - PeakReservation: 112.00 KB (114688)
               - PeakUnpinnedBytes: 0
               - PeakUsedReservation: 112.00 KB (114688)
               - ReadIoBytes: 0
               - ReadIoOps: 0 (0)
               - ReadIoWaitTime: 0.000ns
               - WriteIoBytes: 0
               - WriteIoOps: 0 (0)
               - WriteIoWaitTime: 0.000ns
            Dequeue:
              BytesDequeued(500.000ms): 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 700.00 KB, 2.00 MB, 3.49 MB, 4.39 MB, 5.86 MB, 6.85 MB
               - FirstBatchWaitTime: 0.000ns
               - TotalBytesDequeued: 6.85 MB (7187850)
               - TotalGetBatchTime: 2s237ms
                 - DataWaitTime: 2s219ms
            Enqueue:
              BytesReceived(500.000ms): 0, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 23.36 KB, 328.73 KB, 963.79 KB, 1.64 MB, 2.09 MB, 2.76 MB, 3.23 MB
              DeferredQueueSize(500.000ms): 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 1, 0, 0
               - DispatchTime: (Avg: 108.593us ; Min: 30.525us ; Max: 1.524ms ; Number of samples: 281)
               - DeserializeRowBatchTime: 8.395ms
               - TotalBatchesEnqueued: 281 (281)
               - TotalBatchesReceived: 281 (281)
               - TotalBytesReceived: 3.23 MB (3387144)
               - TotalEarlySenders: 0 (0)
               - TotalEosReceived: 1 (1)
               - TotalHasDeferredRPCsTime: 15s446ms
               - TotalRPCsDeferred: 38 (38)

Sample sender's profile:

        KrpcDataStreamSender (dst_id=21):(Total: 17s923ms, non-child: 604.494ms, % non-child: 3.37%)
          BytesSent(500.000ms): 0, 0, 0, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 34.78 KB, 46.54 KB, 46.54 KB, 46.54 KB, 58.31 KB, 58.31 KB, 58.31 KB, 58.31 KB, 58.31 KB, 58.31 KB, 58.31 KB, 974.44 KB, 2.82 MB, 4.93 MB, 6.27 MB, 8.28 MB, 9.69 MB
           - EosSent: 3 (3)
           - NetworkThroughput: 4.61 MB/sec
           - PeakMemoryUsage: 22.57 KB (23112)
           - RowsSent: 287.51K (287514)
           - RpcFailure: 0 (0)
           - RpcRetry: 0 (0)
           - SerializeBatchTime: 329.162ms
           - TotalBytesSent: 9.69 MB (10161432)
           - UncompressedRowBatchSize: 20.56 MB (21563550)

Change-Id: I8ba405921b3df920c1e85b940ce9c8d02fc647cd
Reviewed-on: http://gerrit.cloudera.org:8080/9690
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/3855cb0f
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/3855cb0f
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/3855cb0f

Branch: refs/heads/2.x
Commit: 3855cb0f59b16d9051dc44efb08df7bb9a337e66
Parents: fc0af7f
Author: Michael Ho <kw...@cloudera.com>
Authored: Thu Mar 15 19:42:10 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Mar 28 23:59:02 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/data-stream-test.cc        |   2 -
 be/src/runtime/krpc-data-stream-mgr.cc    |   2 +-
 be/src/runtime/krpc-data-stream-recvr.cc  | 206 +++++++++++++++++--------
 be/src/runtime/krpc-data-stream-recvr.h   |  91 +++++++----
 be/src/runtime/krpc-data-stream-sender.cc |  58 +++----
 be/src/runtime/krpc-data-stream-sender.h  |  16 +-
 be/src/runtime/runtime-state.h            |   6 +-
 be/src/service/data-stream-service.cc     |  21 ++-
 be/src/service/data-stream-service.h      |   5 +
 common/protobuf/data_stream_service.proto |   6 +
 10 files changed, 278 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/3855cb0f/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index bd76b57..0a0d81e 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -746,7 +746,6 @@ TEST_P(DataStreamTest, UnknownSenderSmallResult) {
   StartSender(TPartitionType::UNPARTITIONED, TOTAL_DATA_SIZE + 1024);
   JoinSenders();
   EXPECT_EQ(sender_info_[0].status.code(), TErrorCode::DATASTREAM_SENDER_TIMEOUT);
-  EXPECT_EQ(sender_info_[0].num_bytes_sent, 0);
 }
 
 TEST_P(DataStreamTest, UnknownSenderLargeResult) {
@@ -756,7 +755,6 @@ TEST_P(DataStreamTest, UnknownSenderLargeResult) {
   StartSender();
   JoinSenders();
   EXPECT_EQ(sender_info_[0].status.code(), TErrorCode::DATASTREAM_SENDER_TIMEOUT);
-  EXPECT_EQ(sender_info_[0].num_bytes_sent, 0);
 }
 
 TEST_P(DataStreamTest, Cancel) {

http://git-wip-us.apache.org/repos/asf/impala/blob/3855cb0f/be/src/runtime/krpc-data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc b/be/src/runtime/krpc-data-stream-mgr.cc
index 5a9305f..cd8d90b 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -255,7 +255,7 @@ void KrpcDataStreamMgr::DeserializeThreadFn(int thread_id, const DeserializeTask
     recvr = FindRecvr(task.finst_id, task.dest_node_id, &already_unregistered);
     DCHECK(recvr != nullptr || already_unregistered);
   }
-  if (recvr != nullptr) recvr->DequeueDeferredRpc(task.sender_id);
+  if (recvr != nullptr) recvr->ProcessDeferredRpc(task.sender_id);
 }
 
 void KrpcDataStreamMgr::CloseSender(const EndDataStreamRequestPB* request,

http://git-wip-us.apache.org/repos/asf/impala/blob/3855cb0f/be/src/runtime/krpc-data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.cc b/be/src/runtime/krpc-data-stream-recvr.cc
index c7126d4..6e47bd6 100644
--- a/be/src/runtime/krpc-data-stream-recvr.cc
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -25,6 +25,7 @@
 
 #include "exec/kudu-util.h"
 #include "kudu/rpc/rpc_context.h"
+#include "kudu/util/monotime.h"
 #include "runtime/fragment-instance-state.h"
 #include "runtime/krpc-data-stream-recvr.h"
 #include "runtime/krpc-data-stream-mgr.h"
@@ -35,6 +36,7 @@
 #include "util/runtime-profile-counters.h"
 #include "util/periodic-counter-updater.h"
 #include "util/test-info.h"
+#include "util/time.h"
 
 #include "gen-cpp/data_stream_service.pb.h"
 
@@ -43,6 +45,8 @@
 DECLARE_bool(use_krpc);
 DECLARE_int32(datastream_service_num_deserialization_threads);
 
+using kudu::MonoDelta;
+using kudu::MonoTime;
 using kudu::rpc::RpcContext;
 using std::condition_variable_any;
 
@@ -82,7 +86,7 @@ class KrpcDataStreamRecvr::SenderQueue {
   // On success, the first entry of 'deferred_rpcs_' is removed and the sender of the RPC
   // will be responded to. If the serialized row batch fails to be extracted from the
   // entry, the error status will be sent as reply.
-  void DequeueDeferredRpc();
+  void ProcessDeferredRpc();
 
   // Takes over the RPC state 'ctx' of an early sender for deferred processing and
   // kicks off a deserialization task to process it asynchronously. The ownership of
@@ -110,12 +114,26 @@ class KrpcDataStreamRecvr::SenderQueue {
   // soft limit of the receiver to be exceeded. Expected to be called with lock_ held.
   bool CanEnqueue(int64_t batch_size) const;
 
+  // Helper function for inserting 'payload' into 'deferred_rpcs_'. Also does some
+  // accounting for various counters.
+  void EnqueueDeferredRpc(unique_ptr<TransmitDataCtx> payload);
+
+  // Helper function for removing the first item from 'deferred_rpcs_'. Also does some
+  // accounting for various counters.
+  void DequeueDeferredRpc();
+
   // Unpacks a serialized row batch from 'request' and 'rpc_context' and populates
-  // 'tuple_offsets' and 'tuple_data'. On success, the deserialized row batch size will
-  // be stored in 'batch_size'. On failure, the error status is returned.
+  // 'tuple_offsets' and 'tuple_data'. On success, the deserialized row batch sizes is
+  // stored in 'deserialized_size'. If 'serialized_size' is not NULL, also stores the
+  // serialized row batch size in it. On failure, the error status is returned.
   static Status UnpackRequest(const TransmitDataRequestPB* request,
       RpcContext* rpc_context, kudu::Slice* tuple_offsets, kudu::Slice* tuple_data,
-      int64_t* batch_size);
+      int64_t* deserialized_size, int64_t* serialized_size = nullptr);
+
+  // Helper function to compute the serialized row batch size from 'request'
+  // and 'rpc_context'. Returns 0 on failure to unpack the serialized row batch.
+  static int64_t GetSerializedBatchSize(const TransmitDataRequestPB* request,
+      RpcContext* rpc_context);
 
   // The workhorse function for deserializing a row batch represented by ('header',
   // 'tuple_offsets' and 'tuple_data') and inserting it into 'batch_queue'. Expects to be
@@ -183,6 +201,11 @@ class KrpcDataStreamRecvr::SenderQueue {
   // full when they last tried to do so. The senders wait here until there is a space for
   // their batches, allowing the receiver-side to implement basic flow-control.
   std::queue<std::unique_ptr<TransmitDataCtx>> deferred_rpcs_;
+
+  // Monotonic time in nanoseconds of when 'deferred_rpcs_' goes from being empty to
+  // non-empty. Set to 0 when 'deferred_rpcs_' becomes empty again. Used for computing
+  // 'total_has_deferred_rpcs_timer_'.
+  int64_t has_deferred_rpcs_start_time_ns_ = 0;
 };
 
 KrpcDataStreamRecvr::SenderQueue::SenderQueue(
@@ -190,7 +213,7 @@ KrpcDataStreamRecvr::SenderQueue::SenderQueue(
   : recvr_(parent_recvr), num_remaining_senders_(num_senders) { }
 
 Status KrpcDataStreamRecvr::SenderQueue::GetBatch(RowBatch** next_batch) {
-  SCOPED_TIMER(recvr_->queue_get_batch_time_);
+  SCOPED_TIMER(recvr_->queue_get_batch_timer_);
   DCHECK(TestInfo::is_test() || FragmentInstanceState::IsFragmentExecThread());
   DCHECK(!recvr_->closed_);
   int num_to_dequeue = 0;
@@ -204,10 +227,15 @@ Status KrpcDataStreamRecvr::SenderQueue::GetBatch(RowBatch** next_batch) {
 
     // Wait until something shows up or we know we're done
     while (batch_queue_.empty() && !is_cancelled_ && num_remaining_senders_ > 0) {
+      // Verify before waiting on 'data_arrival_cv_' that if there are any deferred
+      // batches, either there is outstanding deserialization request queued or there
+      // is pending insertion so this thread is guaranteed to wake up at some point.
+      DCHECK(deferred_rpcs_.empty() ||
+          (num_deserialize_tasks_pending_ + num_pending_enqueue_) > 0);
       VLOG_ROW << "wait arrival fragment_instance_id=" << recvr_->fragment_instance_id()
                << " node=" << recvr_->dest_node_id();
       // Don't count time spent waiting on the sender as active time.
-      CANCEL_SAFE_SCOPED_TIMER(recvr_->data_arrival_timer_, &is_cancelled_);
+      CANCEL_SAFE_SCOPED_TIMER(recvr_->data_wait_timer_, &is_cancelled_);
       CANCEL_SAFE_SCOPED_TIMER(recvr_->inactive_timer_, &is_cancelled_);
       CANCEL_SAFE_SCOPED_TIMER(
           received_first_batch_ ? nullptr : recvr_->first_batch_wait_total_timer_,
@@ -248,7 +276,9 @@ Status KrpcDataStreamRecvr::SenderQueue::GetBatch(RowBatch** next_batch) {
     DCHECK(!batch_queue_.empty());
     received_first_batch_ = true;
     RowBatch* result = batch_queue_.front().second.release();
-    recvr_->num_buffered_bytes_.Add(-batch_queue_.front().first);
+    int64_t batch_size = batch_queue_.front().first;
+    COUNTER_ADD(recvr_->bytes_dequeued_counter_, batch_size);
+    recvr_->num_buffered_bytes_.Add(-batch_size);
     batch_queue_.pop_front();
     VLOG_ROW << "fetched #rows=" << result->num_rows();
     current_batch_.reset(result);
@@ -276,9 +306,29 @@ inline bool KrpcDataStreamRecvr::SenderQueue::CanEnqueue(int64_t batch_size) con
   return queue_empty || !recvr_->ExceedsLimit(batch_size);
 }
 
+void KrpcDataStreamRecvr::SenderQueue::EnqueueDeferredRpc(
+    unique_ptr<TransmitDataCtx> payload) {
+  if (deferred_rpcs_.empty()) has_deferred_rpcs_start_time_ns_ = MonotonicNanos();
+  deferred_rpcs_.push(move(payload));
+  recvr_->num_deferred_rpcs_.Add(1);
+  COUNTER_ADD(recvr_->total_deferred_rpcs_counter_, 1);
+}
+
+void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() {
+  deferred_rpcs_.pop();
+  if (deferred_rpcs_.empty()) {
+    DCHECK_NE(has_deferred_rpcs_start_time_ns_, 0);
+    int64_t duration = MonotonicNanos() - has_deferred_rpcs_start_time_ns_;
+    COUNTER_ADD(recvr_->total_has_deferred_rpcs_timer_, duration);
+    has_deferred_rpcs_start_time_ns_ = 0;
+  }
+  recvr_->num_deferred_rpcs_.Add(-1);
+}
+
 Status KrpcDataStreamRecvr::SenderQueue::UnpackRequest(
     const TransmitDataRequestPB* request, RpcContext* rpc_context,
-    kudu::Slice* tuple_offsets, kudu::Slice* tuple_data, int64_t* batch_size) {
+    kudu::Slice* tuple_offsets, kudu::Slice* tuple_data, int64_t* deserialized_size,
+    int64_t* serialized_size) {
   // Unpack the tuple offsets.
   KUDU_RETURN_IF_ERROR(rpc_context->GetInboundSidecar(
       request->tuple_offsets_sidecar_idx(), tuple_offsets),
@@ -288,8 +338,12 @@ Status KrpcDataStreamRecvr::SenderQueue::UnpackRequest(
       request->tuple_data_sidecar_idx(), tuple_data),
       "Failed to get the tuple data sidecar");
   // Compute the size of the deserialized row batch.
-  *batch_size =
+  *deserialized_size =
       RowBatch::GetDeserializedSize(request->row_batch_header(), *tuple_offsets);
+  // Compute the size of the serialized row batch.
+  if (serialized_size != nullptr) {
+    *serialized_size = tuple_offsets->size() + tuple_data->size();
+  }
   return Status::OK();
 }
 
@@ -300,8 +354,6 @@ Status KrpcDataStreamRecvr::SenderQueue::AddBatchWork(int64_t batch_size,
   DCHECK(lock->owns_lock());
   DCHECK(!is_cancelled_);
 
-  COUNTER_ADD(recvr_->num_received_batches_, 1);
-  COUNTER_ADD(recvr_->bytes_received_counter_, batch_size);
   // Reserve queue space before dropping the lock below.
   recvr_->num_buffered_bytes_.Add(batch_size);
   // Bump 'num_pending_enqueue_' to avoid race with Close() when lock is dropped below.
@@ -331,7 +383,7 @@ Status KrpcDataStreamRecvr::SenderQueue::AddBatchWork(int64_t batch_size,
     return status;
   }
   VLOG_ROW << "added #rows=" << batch->num_rows() << " batch_size=" << batch_size;
-  COUNTER_ADD(recvr_->num_enqueued_batches_, 1);
+  COUNTER_ADD(recvr_->total_enqueued_batches_counter_, 1);
   batch_queue_.emplace_back(batch_size, move(batch));
   data_arrival_cv_.notify_one();
   return Status::OK();
@@ -344,14 +396,15 @@ void KrpcDataStreamRecvr::SenderQueue::AddBatch(const TransmitDataRequestPB* req
   kudu::Slice tuple_offsets;
   kudu::Slice tuple_data;
   int64_t batch_size;
-  COUNTER_ADD(recvr_->num_arrived_batches_, 1);
   Status status = UnpackRequest(request, rpc_context, &tuple_offsets, &tuple_data,
       &batch_size);
   if (UNLIKELY(!status.ok())) {
-    status.ToProto(response->mutable_status());
-    rpc_context->RespondSuccess();
+    DataStreamService::RespondRpc(status, response, rpc_context);
     return;
   }
+  COUNTER_ADD(recvr_->total_received_batches_counter_, 1);
+  // To be consistent with the senders, only count the sidecars size.
+  COUNTER_ADD(recvr_->bytes_received_counter_, tuple_data.size() + tuple_offsets.size());
 
   {
     unique_lock<SpinLock> l(lock_);
@@ -361,8 +414,7 @@ void KrpcDataStreamRecvr::SenderQueue::AddBatch(const TransmitDataRequestPB* req
     // responded to if we reach here.
     DCHECK_GT(num_remaining_senders_, 0);
     if (UNLIKELY(is_cancelled_)) {
-      Status::OK().ToProto(response->mutable_status());
-      rpc_context->RespondSuccess();
+      DataStreamService::RespondRpc(Status::OK(), response, rpc_context);
       return;
     }
 
@@ -375,8 +427,7 @@ void KrpcDataStreamRecvr::SenderQueue::AddBatch(const TransmitDataRequestPB* req
     if (UNLIKELY(!deferred_rpcs_.empty() || !CanEnqueue(batch_size))) {
       recvr_->deferred_rpc_tracker()->Consume(rpc_context->GetTransferSize());
       auto payload = make_unique<TransmitDataCtx>(request, response, rpc_context);
-      deferred_rpcs_.push(move(payload));
-      COUNTER_ADD(recvr_->num_deferred_batches_, 1);
+      EnqueueDeferredRpc(move(payload));
       return;
     }
 
@@ -385,11 +436,10 @@ void KrpcDataStreamRecvr::SenderQueue::AddBatch(const TransmitDataRequestPB* req
   }
 
   // Respond to the sender to ack the insertion of the row batches.
-  status.ToProto(response->mutable_status());
-  rpc_context->RespondSuccess();
+  DataStreamService::RespondRpc(status, response, rpc_context);
 }
 
-void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() {
+void KrpcDataStreamRecvr::SenderQueue::ProcessDeferredRpc() {
   // Owns the first entry of 'deferred_rpcs_' if it ends up being popped.
   std::unique_ptr<TransmitDataCtx> ctx;
   Status status;
@@ -413,7 +463,7 @@ void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() {
     if (UNLIKELY(!status.ok())) {
       DataStreamService::RespondAndReleaseRpc(status, ctx->response, ctx->rpc_context,
           recvr_->deferred_rpc_tracker());
-      deferred_rpcs_.pop();
+      DequeueDeferredRpc();
       return;
     }
 
@@ -426,7 +476,7 @@ void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() {
     }
 
     // Dequeues the deferred batch and adds it to 'batch_queue_'.
-    deferred_rpcs_.pop();
+    DequeueDeferredRpc();
     const RowBatchHeaderPB& header = ctx->request->row_batch_header();
     status = AddBatchWork(batch_size, header, tuple_offsets, tuple_data, &l);
     DCHECK(!status.ok() || !batch_queue_.empty());
@@ -437,8 +487,20 @@ void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() {
 
   // Responds to the sender to ack the insertion of the row batches.
   // No need to hold lock when enqueuing the response.
-  status.ToProto(ctx->response->mutable_status());
-  ctx->rpc_context->RespondSuccess();
+  DataStreamService::RespondRpc(status, ctx->response, ctx->rpc_context);
+}
+
+int64_t KrpcDataStreamRecvr::SenderQueue::GetSerializedBatchSize(
+    const TransmitDataRequestPB* request, RpcContext* rpc_context) {
+  kudu::Slice tuple_offsets;
+  kudu::Slice tuple_data;
+  int64_t unused;
+  int64_t serialized_size = 0;
+  if (UnpackRequest(request, rpc_context, &tuple_offsets, &tuple_data, &unused,
+          &serialized_size).ok()) {
+    return serialized_size;
+  }
+  return 0;
 }
 
 void KrpcDataStreamRecvr::SenderQueue::TakeOverEarlySender(
@@ -448,21 +510,21 @@ void KrpcDataStreamRecvr::SenderQueue::TakeOverEarlySender(
   // 'recvr_->mgr_' shouldn't be NULL.
   DCHECK(TestInfo::is_test() || FragmentInstanceState::IsFragmentExecThread());
   DCHECK(!recvr_->closed_ && recvr_->mgr_ != nullptr);
+  COUNTER_ADD(recvr_->total_received_batches_counter_, 1);
+  COUNTER_ADD(recvr_->bytes_received_counter_,
+      GetSerializedBatchSize(ctx->request, ctx->rpc_context));
   int sender_id = ctx->request->sender_id();
-  COUNTER_ADD(recvr_->num_arrived_batches_, 1);
   {
     lock_guard<SpinLock> l(lock_);
     if (UNLIKELY(is_cancelled_)) {
-      Status::OK().ToProto(ctx->response->mutable_status());
-      ctx->rpc_context->RespondSuccess();
+      DataStreamService::RespondRpc(Status::OK(), ctx->response, ctx->rpc_context);
       return;
     }
     // Only enqueue a deferred RPC if the sender queue is not yet cancelled.
     recvr_->deferred_rpc_tracker()->Consume(ctx->rpc_context->GetTransferSize());
-    deferred_rpcs_.push(move(ctx));
+    EnqueueDeferredRpc(move(ctx));
     ++num_deserialize_tasks_pending_;
   }
-  COUNTER_ADD(recvr_->num_deferred_batches_, 1);
   recvr_->mgr_->EnqueueDeserializeTask(recvr_->fragment_instance_id(),
       recvr_->dest_node_id(), sender_id, 1);
 }
@@ -489,7 +551,7 @@ void KrpcDataStreamRecvr::SenderQueue::Cancel() {
       const unique_ptr<TransmitDataCtx>& ctx = deferred_rpcs_.front();
       DataStreamService::RespondAndReleaseRpc(Status::OK(), ctx->response,
           ctx->rpc_context, recvr_->deferred_rpc_tracker());
-      deferred_rpcs_.pop();
+      DequeueDeferredRpc();
     }
   }
   VLOG_QUERY << "cancelled stream: fragment_instance_id="
@@ -565,8 +627,8 @@ KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr,
     parent_tracker_(parent_tracker),
     buffer_pool_client_(client),
     profile_(profile),
-    recvr_side_profile_(RuntimeProfile::Create(&pool_, "RecvrSide")),
-    sender_side_profile_(RuntimeProfile::Create(&pool_, "SenderSide")) {
+    dequeue_profile_(RuntimeProfile::Create(&pool_, "Dequeue")),
+    enqueue_profile_(RuntimeProfile::Create(&pool_, "Enqueue")) {
   // Create one queue per sender if is_merging is true.
   int num_queues = is_merging ? num_senders : 1;
   sender_queues_.reserve(num_queues);
@@ -576,36 +638,47 @@ KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr,
     sender_queues_.push_back(queue);
   }
 
-  // Add the receiver and sender sides' profiles as children of the owning exchange
-  // node's profile.
-  profile_->AddChild(recvr_side_profile_);
-  profile_->AddChild(sender_side_profile_);
+  // Add the profiles of the dequeuing side (i.e. GetBatch()) and the enqueuing side
+  // (i.e. AddBatchWork()) as children of the owning exchange node's profile.
+  profile_->AddChild(dequeue_profile_);
+  profile_->AddChild(enqueue_profile_);
+
+  // Initialize various counters for measuring dequeuing from queues.
+  bytes_dequeued_counter_ =
+      ADD_COUNTER(dequeue_profile_, "TotalBytesDequeued", TUnit::BYTES);
+  bytes_dequeued_time_series_counter_ = ADD_TIME_SERIES_COUNTER(
+      dequeue_profile_, "BytesDequeued", bytes_dequeued_counter_);
+  queue_get_batch_timer_ = ADD_TIMER(dequeue_profile_, "TotalGetBatchTime");
+  data_wait_timer_ =
+      ADD_CHILD_TIMER(dequeue_profile_, "DataWaitTime", "TotalGetBatchTime");
+  inactive_timer_ = profile_->inactive_timer();
+  first_batch_wait_total_timer_ =
+      ADD_TIMER(dequeue_profile_, "FirstBatchWaitTime");
 
-  // Initialize the counters
+  // Initialize various counters for measuring enqueuing into queues.
   bytes_received_counter_ =
-      ADD_COUNTER(recvr_side_profile_, "TotalBytesReceived", TUnit::BYTES);
+      ADD_COUNTER(enqueue_profile_, "TotalBytesReceived", TUnit::BYTES);
   bytes_received_time_series_counter_ = ADD_TIME_SERIES_COUNTER(
-      recvr_side_profile_, "BytesReceived", bytes_received_counter_);
-  queue_get_batch_time_ = ADD_TIMER(recvr_side_profile_, "TotalGetBatchTime");
-  data_arrival_timer_ =
-      ADD_CHILD_TIMER(recvr_side_profile_, "DataArrivalTimer", "TotalGetBatchTime");
-  inactive_timer_ = profile_->inactive_timer();
-  first_batch_wait_total_timer_ =
-      ADD_TIMER(recvr_side_profile_, "FirstBatchArrivalWaitTime");
+      enqueue_profile_, "BytesReceived", bytes_received_counter_);
   deserialize_row_batch_timer_ =
-      ADD_TIMER(sender_side_profile_, "DeserializeRowBatchTime");
-  num_early_senders_ =
-      ADD_COUNTER(sender_side_profile_, "NumEarlySenders", TUnit::UNIT);
-  num_arrived_batches_ =
-      ADD_COUNTER(sender_side_profile_, "NumBatchesArrived", TUnit::UNIT);
-  num_received_batches_ =
-      ADD_COUNTER(sender_side_profile_, "NumBatchesReceived", TUnit::UNIT);
-  num_enqueued_batches_ =
-      ADD_COUNTER(sender_side_profile_, "NumBatchesEnqueued", TUnit::UNIT);
-  num_deferred_batches_ =
-      ADD_COUNTER(sender_side_profile_, "NumBatchesDeferred", TUnit::UNIT);
-  num_eos_received_ =
-      ADD_COUNTER(sender_side_profile_, "NumEosReceived", TUnit::UNIT);
+      ADD_TIMER(enqueue_profile_, "DeserializeRowBatchTime");
+  total_eos_received_counter_ =
+      ADD_COUNTER(enqueue_profile_, "TotalEosReceived", TUnit::UNIT);
+  total_early_senders_counter_ =
+      ADD_COUNTER(enqueue_profile_, "TotalEarlySenders", TUnit::UNIT);
+  total_received_batches_counter_ =
+      ADD_COUNTER(enqueue_profile_, "TotalBatchesReceived", TUnit::UNIT);
+  total_enqueued_batches_counter_ =
+      ADD_COUNTER(enqueue_profile_, "TotalBatchesEnqueued", TUnit::UNIT);
+  total_deferred_rpcs_counter_ =
+      ADD_COUNTER(enqueue_profile_, "TotalRPCsDeferred", TUnit::UNIT);
+  deferred_rpcs_time_series_counter_ =
+      enqueue_profile_->AddTimeSeriesCounter("DeferredQueueSize", TUnit::UNIT,
+      bind<int64_t>(mem_fn(&KrpcDataStreamRecvr::num_deferred_rpcs), this));
+  total_has_deferred_rpcs_timer_ =
+      ADD_TIMER(enqueue_profile_, "TotalHasDeferredRPCsTime");
+  dispatch_timer_ =
+      ADD_SUMMARY_STATS_TIMER(enqueue_profile_, "DispatchTime");
 }
 
 Status KrpcDataStreamRecvr::GetNext(RowBatch* output_batch, bool* eos) {
@@ -616,28 +689,30 @@ Status KrpcDataStreamRecvr::GetNext(RowBatch* output_batch, bool* eos) {
 
 void KrpcDataStreamRecvr::AddBatch(const TransmitDataRequestPB* request,
     TransmitDataResponsePB* response, RpcContext* rpc_context) {
+  MonoDelta duration(MonoTime::Now().GetDeltaSince(rpc_context->GetTimeReceived()));
+  dispatch_timer_->UpdateCounter(duration.ToNanoseconds());
   int use_sender_id = is_merging_ ? request->sender_id() : 0;
   // Add all batches to the same queue if is_merging_ is false.
   sender_queues_[use_sender_id]->AddBatch(request, response, rpc_context);
 }
 
-void KrpcDataStreamRecvr::DequeueDeferredRpc(int sender_id) {
+void KrpcDataStreamRecvr::ProcessDeferredRpc(int sender_id) {
   int use_sender_id = is_merging_ ? sender_id : 0;
   // Add all batches to the same queue if is_merging_ is false.
-  sender_queues_[use_sender_id]->DequeueDeferredRpc();
+  sender_queues_[use_sender_id]->ProcessDeferredRpc();
 }
 
 void KrpcDataStreamRecvr::TakeOverEarlySender(unique_ptr<TransmitDataCtx> ctx) {
   int use_sender_id = is_merging_ ? ctx->request->sender_id() : 0;
   // Add all batches to the same queue if is_merging_ is false.
   sender_queues_[use_sender_id]->TakeOverEarlySender(move(ctx));
-  COUNTER_ADD(num_early_senders_, 1);
+  COUNTER_ADD(total_early_senders_counter_, 1);
 }
 
 void KrpcDataStreamRecvr::RemoveSender(int sender_id) {
   int use_sender_id = is_merging_ ? sender_id : 0;
   sender_queues_[use_sender_id]->DecrementSenders();
-  COUNTER_ADD(num_eos_received_, 1);
+  COUNTER_ADD(total_eos_received_counter_, 1);
 }
 
 void KrpcDataStreamRecvr::CancelStream() {
@@ -660,7 +735,8 @@ void KrpcDataStreamRecvr::Close() {
   // Given all queues have been cancelled and closed already at this point, it's safe to
   // call Close() on 'deferred_rpc_tracker_' without holding any lock here.
   deferred_rpc_tracker_->Close();
-  recvr_side_profile_->StopPeriodicCounters();
+  dequeue_profile_->StopPeriodicCounters();
+  enqueue_profile_->StopPeriodicCounters();
 
   // Remove reference to the unowned resources which may be freed after Close().
   mgr_ = nullptr;

http://git-wip-us.apache.org/repos/asf/impala/blob/3855cb0f/be/src/runtime/krpc-data-stream-recvr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.h b/be/src/runtime/krpc-data-stream-recvr.h
index 845dbf5..1435e44 100644
--- a/be/src/runtime/krpc-data-stream-recvr.h
+++ b/be/src/runtime/krpc-data-stream-recvr.h
@@ -145,7 +145,7 @@ class KrpcDataStreamRecvr : public DataStreamRecvrBase {
   /// identified by 'sender_id'. If is_merging_ is false, it always defaults to
   /// queue 0; If is_merging_ is true, the sender queue is identified by 'sender_id_'.
   /// Called from KrpcDataStreamMgr's deserialization threads only.
-  void DequeueDeferredRpc(int sender_id);
+  void ProcessDeferredRpc(int sender_id);
 
   /// Takes over the RPC state 'ctx' of an early sender for deferred processing and
   /// kicks off a deserialization task to process it asynchronously. This makes sure
@@ -167,6 +167,9 @@ class KrpcDataStreamRecvr : public DataStreamRecvrBase {
     return num_buffered_bytes_.Load() + batch_size > total_buffer_limit_;
   }
 
+  /// Return the current number of deferred RPCs.
+  int64_t num_deferred_rpcs() const { return num_deferred_rpcs_.Load(); }
+
   /// KrpcDataStreamMgr instance used to create this recvr. Not owned.
   KrpcDataStreamMgr* mgr_;
 
@@ -190,9 +193,12 @@ class KrpcDataStreamRecvr : public DataStreamRecvrBase {
   /// from the fragment execution thread.
   bool closed_;
 
-  /// total number of bytes held across all sender queues.
+  /// Current number of bytes held across all sender queues.
   AtomicInt32 num_buffered_bytes_;
 
+  /// Current number of outstanding deferred RPCs across all sender queues.
+  AtomicInt64 num_deferred_rpcs_;
+
   /// Memtracker for payloads of deferred Rpcs in the sender queue(s). This must be
   /// accessed with a sender queue's lock held to avoid race with Close() of the queue.
   boost::scoped_ptr<MemTracker> deferred_rpc_tracker_;
@@ -217,60 +223,79 @@ class KrpcDataStreamRecvr : public DataStreamRecvrBase {
   ObjectPool pool_;
 
   /// Runtime profile of the owning exchange node. It's the parent of
-  /// 'recvr_side_profile_' and 'sender_side_profile_'. Not owned.
+  /// 'dequeue_profile_' and 'enqueue_profile_'. Not owned.
   RuntimeProfile* profile_;
 
   /// Maintain two child profiles - receiver side measurements (from the GetBatch() path),
   /// and sender side measurements (from AddBatch()). These two profiles own all counters
   /// below unless otherwise noted. These profiles are owned by the receiver and placed
-  /// in 'pool_'. 'recvr_side_profile_' and 'sender_side_profile_' must outlive 'profile_'
+  /// in 'pool_'. 'dequeue_profile_' and 'enqueue_profile_' must outlive 'profile_'
   /// to prevent accessing freed memory during top-down traversal of 'profile_'. The
   /// receiver is co-owned by the exchange node and the data stream manager so these two
   /// profiles should outlive the exchange node which owns 'profile_'.
-  RuntimeProfile* recvr_side_profile_;
-  RuntimeProfile* sender_side_profile_;
+  RuntimeProfile* dequeue_profile_;
+  RuntimeProfile* enqueue_profile_;
 
-  /// Number of bytes received but not necessarily enqueued.
-  RuntimeProfile::Counter* bytes_received_counter_;
+  /// Pointer to profile's inactive timer. Not owned.
+  /// Not directly shown in the profile and thus data_wait_time_ below. Used for
+  /// subtracting the wait time from the total time spent in exchange node.
+  RuntimeProfile::Counter* inactive_timer_;
 
-  /// Time series of number of bytes received, samples bytes_received_counter_.
-  RuntimeProfile::TimeSeriesCounter* bytes_received_time_series_counter_;
+  /// ------------------------------------------------------------------------------------
+  /// Following counters belong to 'dequeue_profile_'.
 
-  /// Total wall-clock time spent deserializing row batches.
-  RuntimeProfile::Counter* deserialize_row_batch_timer_;
+  /// Number of bytes of deserialized row batches dequeued.
+  RuntimeProfile::Counter* bytes_dequeued_counter_;
+
+  /// Time series of bytes of deserialized row batches, samples 'bytes_dequeued_counter_'.
+  RuntimeProfile::TimeSeriesCounter* bytes_dequeued_time_series_counter_;
+
+  /// Total wall-clock time spent in SenderQueue::GetBatch().
+  RuntimeProfile::Counter* queue_get_batch_timer_;
 
-  /// Number of senders which arrive before the receiver is ready.
-  RuntimeProfile::Counter* num_early_senders_;
+  /// Total wall-clock time spent waiting for data to be available in queues.
+  RuntimeProfile::Counter* data_wait_timer_;
 
-  /// Time spent waiting until the first batch arrives across all queues.
-  /// TODO: Turn this into a wall-clock timer.
+  /// Wall-clock time spent waiting for the first batch arrival across all queues.
   RuntimeProfile::Counter* first_batch_wait_total_timer_;
 
-  /// Total number of batches which arrived at this receiver but not necessarily received
-  /// or enqueued. An arrived row batch will eventually be received if there is no error
-  /// unpacking the RPC payload and the receiving stream is not cancelled.
-  RuntimeProfile::Counter* num_arrived_batches_;
+  /// ------------------------------------------------------------------------------------
+  /// Following counters belong to 'enqueue_profile_'.
 
-  /// Total number of batches received but not necessarily enqueued.
-  RuntimeProfile::Counter* num_received_batches_;
+  /// Total number of bytes of serialized row batches received.
+  RuntimeProfile::Counter* bytes_received_counter_;
 
-  /// Total number of batches received and enqueued into the row batch queue.
-  RuntimeProfile::Counter* num_enqueued_batches_;
+  /// Time series of number of bytes received, samples 'bytes_received_counter_'.
+  RuntimeProfile::TimeSeriesCounter* bytes_received_time_series_counter_;
 
-  /// Total number of batches deferred because of early senders or full row batch queue.
-  RuntimeProfile::Counter* num_deferred_batches_;
+  /// Total wall-clock time spent deserializing row batches.
+  RuntimeProfile::Counter* deserialize_row_batch_timer_;
 
   /// Total number of EOS received.
-  RuntimeProfile::Counter* num_eos_received_;
+  RuntimeProfile::Counter* total_eos_received_counter_;
 
-  /// Total wall-clock time spent waiting for data to arrive in the recv buffer.
-  RuntimeProfile::Counter* data_arrival_timer_;
+  /// Total number of senders which arrive before the receiver is ready.
+  RuntimeProfile::Counter* total_early_senders_counter_;
 
-  /// Pointer to profile's inactive timer. Not owned.
-  RuntimeProfile::Counter* inactive_timer_;
+  /// Total number of serialized row batches received.
+  RuntimeProfile::Counter* total_received_batches_counter_;
+
+  /// Total number of deserialized row batches enqueued into the row batch queues.
+  RuntimeProfile::Counter* total_enqueued_batches_counter_;
+
+  /// Total number of RPCs whose responses are deferred because of early senders or
+  /// full row batch queue.
+  RuntimeProfile::Counter* total_deferred_rpcs_counter_;
+
+  /// Time series of number of deferred row batches, samples 'num_deferred_rpcs_'.
+  RuntimeProfile::TimeSeriesCounter* deferred_rpcs_time_series_counter_;
+
+  /// Total wall-clock time in which the 'deferred_rpcs_' queues are not empty.
+  RuntimeProfile::Counter* total_has_deferred_rpcs_timer_;
 
-  /// Total time spent in SenderQueue::GetBatch().
-  RuntimeProfile::Counter* queue_get_batch_time_;
+  /// Summary stats of time which RPCs spent in KRPC service queue before
+  /// being dispatched to the RPC handlers.
+  RuntimeProfile::SummaryStatsCounter* dispatch_timer_;
 };
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/3855cb0f/be/src/runtime/krpc-data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc
index bc6a07d..0f11dec 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -142,8 +142,6 @@ class KrpcDataStreamSender::Channel : public CacheLineAligned {
   // This function blocks until the EOS RPC is complete.
   Status FlushAndSendEos(RuntimeState* state);
 
-  int64_t num_data_bytes_sent() const { return num_data_bytes_sent_; }
-
   // The type for a RPC worker function.
   typedef boost::function<Status()> DoRpcFn;
 
@@ -160,9 +158,6 @@ class KrpcDataStreamSender::Channel : public CacheLineAligned {
   const TUniqueId fragment_instance_id_;
   const PlanNodeId dest_node_id_;
 
-  // Number of bytes of all serialized row batches sent successfully.
-  int64_t num_data_bytes_sent_ = 0;
-
   // The row batch for accumulating rows copied from AddRow().
   // Only used if the partitioning scheme is "KUDU" or "HASH_PARTITIONED".
   scoped_ptr<RowBatch> batch_;
@@ -211,6 +206,9 @@ class KrpcDataStreamSender::Channel : public CacheLineAligned {
   // The pointer to the current serialized row batch being sent.
   const OutboundRowBatch* rpc_in_flight_batch_ = nullptr;
 
+  // The monotonic time in nanoseconds of when current RPC started.
+  int64_t rpc_start_time_ns_ = 0;
+
   // True if there is an in-flight RPC.
   bool rpc_in_flight_ = false;
 
@@ -311,12 +309,14 @@ void KrpcDataStreamSender::Channel::MarkDone(const Status& status) {
   rpc_in_flight_ = false;
   rpc_in_flight_batch_ = nullptr;
   rpc_done_cv_.notify_one();
+  rpc_start_time_ns_ = 0;
 }
 
 Status KrpcDataStreamSender::Channel::WaitForRpc(std::unique_lock<SpinLock>* lock) {
   DCHECK(lock != nullptr);
   DCHECK(lock->owns_lock());
 
+  SCOPED_TIMER(parent_->profile()->inactive_timer());
   SCOPED_TIMER(parent_->state_->total_network_send_timer());
 
   // Wait for in-flight RPCs to complete unless the parent sender is closed or cancelled.
@@ -379,17 +379,20 @@ void KrpcDataStreamSender::Channel::HandleFailedRPC(const DoRpcFn& rpc_fn,
 }
 
 void KrpcDataStreamSender::Channel::TransmitDataCompleteCb() {
+  DCHECK_NE(rpc_start_time_ns_, 0);
+  int64_t total_time = MonotonicNanos() - rpc_start_time_ns_;
   std::unique_lock<SpinLock> l(lock_);
   DCHECK(rpc_in_flight_);
   const kudu::Status controller_status = rpc_controller_.status();
   if (LIKELY(controller_status.ok())) {
+    DCHECK(rpc_in_flight_batch_ != nullptr);
+    COUNTER_ADD(parent_->bytes_sent_counter_,
+        RowBatch::GetSerializedSize(*rpc_in_flight_batch_));
+    int64_t network_time = total_time - resp_.receiver_latency_ns();
+    COUNTER_ADD(&parent_->total_network_timer_, network_time);
     Status rpc_status = Status::OK();
     int32_t status_code = resp_.status().status_code();
-    if (LIKELY(status_code == TErrorCode::OK)) {
-      DCHECK(rpc_in_flight_batch_ != nullptr);
-      num_data_bytes_sent_ += RowBatch::GetSerializedSize(*rpc_in_flight_batch_);
-      VLOG_ROW << "incremented #data_bytes_sent=" << num_data_bytes_sent_;
-    } else if (status_code == TErrorCode::DATASTREAM_RECVR_CLOSED) {
+    if (status_code == TErrorCode::DATASTREAM_RECVR_CLOSED) {
       remote_recvr_closed_ = true;
     } else {
       rpc_status = Status(resp_.status());
@@ -429,6 +432,7 @@ Status KrpcDataStreamSender::Channel::DoTransmitDataRpc() {
   req.set_tuple_offsets_sidecar_idx(sidecar_idx);
 
   // Add 'tuple_data_' as sidecar.
+  rpc_start_time_ns_ = MonotonicNanos();
   KUDU_RETURN_IF_ERROR(rpc_controller_.AddOutboundSidecar(
       RpcSidecar::FromSlice(rpc_in_flight_batch_->TupleDataAsSlice()), &sidecar_idx),
       "Unable to add tuple data to sidecar");
@@ -572,6 +576,7 @@ KrpcDataStreamSender::KrpcDataStreamSender(int sender_id, const RowDescriptor* r
     sender_id_(sender_id),
     partition_type_(sink.output_partition.type),
     per_channel_buffer_size_(per_channel_buffer_size),
+    total_network_timer_(TUnit::TIME_NS, 0),
     dest_node_id_(sink.dest_node_id),
     next_unknown_partition_(0) {
   DCHECK_GT(destinations.size(), 0);
@@ -605,6 +610,7 @@ KrpcDataStreamSender::~KrpcDataStreamSender() {
 
 Status KrpcDataStreamSender::Init(const vector<TExpr>& thrift_output_exprs,
     const TDataSink& tsink, RuntimeState* state) {
+  SCOPED_TIMER(profile_->total_time_counter());
   DCHECK(tsink.__isset.stream_sink);
   if (partition_type_ == TPartitionType::HASH_PARTITIONED ||
       partition_type_ == TPartitionType::KUDU) {
@@ -625,15 +631,17 @@ Status KrpcDataStreamSender::Prepare(
   serialize_batch_timer_ = ADD_TIMER(profile(), "SerializeBatchTime");
   rpc_retry_counter_ = ADD_COUNTER(profile(), "RpcRetry", TUnit::UNIT);
   rpc_failure_counter_ = ADD_COUNTER(profile(), "RpcFailure", TUnit::UNIT);
-  bytes_sent_counter_ = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);
+  bytes_sent_counter_ = ADD_COUNTER(profile(), "TotalBytesSent", TUnit::BYTES);
+  bytes_sent_time_series_counter_ =
+      ADD_TIME_SERIES_COUNTER(profile(), "BytesSent", bytes_sent_counter_);
+  network_throughput_counter_ =
+      profile()->AddDerivedCounter("NetworkThroughput", TUnit::BYTES_PER_SECOND,
+          bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_sent_counter_,
+              &total_network_timer_));
   eos_sent_counter_ = ADD_COUNTER(profile(), "EosSent", TUnit::UNIT);
   uncompressed_bytes_counter_ =
       ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES);
-  total_sent_rows_counter_= ADD_COUNTER(profile(), "RowsReturned", TUnit::UNIT);
-  overall_throughput_ =
-      profile()->AddDerivedCounter("OverallThroughput", TUnit::BYTES_PER_SECOND,
-           bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_sent_counter_,
-                         profile()->total_time_counter()));
+  total_sent_rows_counter_= ADD_COUNTER(profile(), "RowsSent", TUnit::UNIT);
   for (int i = 0; i < channels_.size(); ++i) {
     RETURN_IF_ERROR(channels_[i]->Init(state));
   }
@@ -641,17 +649,19 @@ Status KrpcDataStreamSender::Prepare(
 }
 
 Status KrpcDataStreamSender::Open(RuntimeState* state) {
+  SCOPED_TIMER(profile_->total_time_counter());
   return ScalarExprEvaluator::Open(partition_expr_evals_, state);
 }
 
 Status KrpcDataStreamSender::Send(RuntimeState* state, RowBatch* batch) {
+  SCOPED_TIMER(profile()->total_time_counter());
   DCHECK(!closed_);
   DCHECK(!flushed_);
 
   if (batch->num_rows() == 0) return Status::OK();
   if (partition_type_ == TPartitionType::UNPARTITIONED) {
     OutboundRowBatch* outbound_batch = &outbound_batches_[next_batch_idx_];
-    RETURN_IF_ERROR(SerializeBatch(batch, outbound_batch));
+    RETURN_IF_ERROR(SerializeBatch(batch, outbound_batch, channels_.size()));
     // TransmitData() will block if there are still in-flight rpcs (and those will
     // reference the previously written serialized batch).
     for (int i = 0; i < channels_.size(); ++i) {
@@ -736,6 +746,7 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, RowBatch* batch) {
 }
 
 Status KrpcDataStreamSender::FlushFinal(RuntimeState* state) {
+  SCOPED_TIMER(profile()->total_time_counter());
   DCHECK(!flushed_);
   DCHECK(!closed_);
   flushed_ = true;
@@ -749,12 +760,14 @@ Status KrpcDataStreamSender::FlushFinal(RuntimeState* state) {
 }
 
 void KrpcDataStreamSender::Close(RuntimeState* state) {
+  SCOPED_TIMER(profile()->total_time_counter());
   if (closed_) return;
   for (int i = 0; i < channels_.size(); ++i) {
     channels_[i]->Teardown(state);
   }
   ScalarExprEvaluator::Close(partition_expr_evals_, state);
   ScalarExpr::Close(partition_exprs_);
+  profile()->StopPeriodicCounters();
   DataSink::Close(state);
 }
 
@@ -762,25 +775,16 @@ Status KrpcDataStreamSender::SerializeBatch(
     RowBatch* src, OutboundRowBatch* dest, int num_receivers) {
   VLOG_ROW << "serializing " << src->num_rows() << " rows";
   {
-    SCOPED_TIMER(profile_->total_time_counter());
     SCOPED_TIMER(serialize_batch_timer_);
     RETURN_IF_ERROR(src->Serialize(dest));
-    int64_t bytes = RowBatch::GetSerializedSize(*dest);
     int64_t uncompressed_bytes = RowBatch::GetDeserializedSize(*dest);
-    COUNTER_ADD(bytes_sent_counter_, bytes * num_receivers);
     COUNTER_ADD(uncompressed_bytes_counter_, uncompressed_bytes * num_receivers);
   }
   return Status::OK();
 }
 
 int64_t KrpcDataStreamSender::GetNumDataBytesSent() const {
-  // TODO: do we need synchronization here or are reads & writes to 8-byte ints
-  // atomic?
-  int64_t result = 0;
-  for (int i = 0; i < channels_.size(); ++i) {
-    result += channels_[i]->num_data_bytes_sent();
-  }
-  return result;
+  return bytes_sent_counter_->value();
 }
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/3855cb0f/be/src/runtime/krpc-data-stream-sender.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-sender.h b/be/src/runtime/krpc-data-stream-sender.h
index bc18574..e6c6ccf 100644
--- a/be/src/runtime/krpc-data-stream-sender.h
+++ b/be/src/runtime/krpc-data-stream-sender.h
@@ -157,20 +157,28 @@ class KrpcDataStreamSender : public DataSink {
   /// Total number of times RPC fails or the remote responds with a non-retryable error.
   RuntimeProfile::Counter* rpc_failure_counter_ = nullptr;
 
-  /// Total number of bytes sent.
+  /// Total number of bytes sent. Updated on RPC completion.
   RuntimeProfile::Counter* bytes_sent_counter_ = nullptr;
 
+  /// Time series of number of bytes sent, samples bytes_sent_counter_.
+  RuntimeProfile::TimeSeriesCounter* bytes_sent_time_series_counter_ = nullptr;
+
   /// Total number of EOS sent.
   RuntimeProfile::Counter* eos_sent_counter_ = nullptr;
 
-  /// Total number of bytes of the row batches before compression.
+  /// Total number of bytes of row batches before compression.
   RuntimeProfile::Counter* uncompressed_bytes_counter_ = nullptr;
 
   /// Total number of rows sent.
   RuntimeProfile::Counter* total_sent_rows_counter_ = nullptr;
 
-  /// Throughput per total time spent in sender
-  RuntimeProfile::Counter* overall_throughput_ = nullptr;
+  /// Approximate network throughput for sending row batches.
+  RuntimeProfile::Counter* network_throughput_counter_ = nullptr;
+
+  /// Aggregated time spent in network (including queuing time in KRPC transfer queue)
+  /// for transmitting the RPC requests and receiving the responses. Used for computing
+  /// 'network_throughput_'. Not too meaningful by itself so not shown in profile.
+  RuntimeProfile::Counter total_network_timer_;
 
   /// Identifier of the destination plan node.
   PlanNodeId dest_node_id_;

http://git-wip-us.apache.org/repos/asf/impala/blob/3855cb0f/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index b292789..7edd718 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -356,7 +356,11 @@ class RuntimeState {
   /// Total time waiting in storage (across all threads)
   RuntimeProfile::Counter* total_storage_wait_timer_;
 
-  /// Total time spent sending over the network (across all threads)
+  /// Total time spent waiting for RPCs to complete. This time is a combination of:
+  /// - network time of sending the RPC payload to the destination
+  /// - processing and queuing time in the destination
+  /// - network time of sending the RPC response to the originating node
+  /// TODO: rename this counter and account for the 3 components above. IMPALA-6705.
   RuntimeProfile::Counter* total_network_send_timer_;
 
   /// Total time spent receiving over the network (across all threads)

http://git-wip-us.apache.org/repos/asf/impala/blob/3855cb0f/be/src/service/data-stream-service.cc
----------------------------------------------------------------------
diff --git a/be/src/service/data-stream-service.cc b/be/src/service/data-stream-service.cc
index d94837b..1d42a99 100644
--- a/be/src/service/data-stream-service.cc
+++ b/be/src/service/data-stream-service.cc
@@ -23,6 +23,7 @@
 #include "common/status.h"
 #include "exec/kudu-util.h"
 #include "kudu/rpc/rpc_context.h"
+#include "kudu/util/monotime.h"
 #include "rpc/rpc-mgr.h"
 #include "runtime/krpc-data-stream-mgr.h"
 #include "runtime/exec-env.h"
@@ -36,6 +37,8 @@
 #include "common/names.h"
 
 using kudu::rpc::RpcContext;
+using kudu::MonoDelta;
+using kudu::MonoTime;
 
 static const string queue_limit_msg = "(Advanced) Limit on RPC payloads consumption for "
     "DataStreamService. " + Substitute(MEM_UNITS_HELP_MSG, "the process memory limit");
@@ -82,13 +85,27 @@ void DataStreamService::TransmitData(const TransmitDataRequestPB* request,
 }
 
 template<typename ResponsePBType>
+void DataStreamService::RespondRpc(const Status& status,
+    ResponsePBType* response, kudu::rpc::RpcContext* ctx) {
+  MonoDelta duration(MonoTime::Now().GetDeltaSince(ctx->GetTimeReceived()));
+  status.ToProto(response->mutable_status());
+  response->set_receiver_latency_ns(duration.ToNanoseconds());
+  ctx->RespondSuccess();
+}
+
+template<typename ResponsePBType>
 void DataStreamService::RespondAndReleaseRpc(const Status& status,
     ResponsePBType* response, kudu::rpc::RpcContext* ctx, MemTracker* mem_tracker) {
   mem_tracker->Release(ctx->GetTransferSize());
-  status.ToProto(response->mutable_status());
-  ctx->RespondSuccess();
+  RespondRpc(status, response, ctx);
 }
 
+template void DataStreamService::RespondRpc(const Status& status,
+    TransmitDataResponsePB* response, kudu::rpc::RpcContext* ctx);
+
+template void DataStreamService::RespondRpc(const Status& status,
+    EndDataStreamResponsePB* response, kudu::rpc::RpcContext* ctx);
+
 template void DataStreamService::RespondAndReleaseRpc(const Status& status,
     TransmitDataResponsePB* response, kudu::rpc::RpcContext* ctx,
     MemTracker* mem_tracker);

http://git-wip-us.apache.org/repos/asf/impala/blob/3855cb0f/be/src/service/data-stream-service.h
----------------------------------------------------------------------
diff --git a/be/src/service/data-stream-service.h b/be/src/service/data-stream-service.h
index 812fb2c..e233165 100644
--- a/be/src/service/data-stream-service.h
+++ b/be/src/service/data-stream-service.h
@@ -62,6 +62,11 @@ class DataStreamService : public DataStreamServiceIf {
   static void RespondAndReleaseRpc(const Status& status, ResponsePBType* response,
       kudu::rpc::RpcContext* ctx, MemTracker* mem_tracker);
 
+  /// Respond to a RPC passed in 'response'/'ctx' with 'status'. Takes ownership of 'ctx'.
+  template<typename ResponsePBType>
+  static void RespondRpc(const Status& status, ResponsePBType* response,
+      kudu::rpc::RpcContext* ctx);
+
   MemTracker* mem_tracker() { return mem_tracker_.get(); }
 
  private:

http://git-wip-us.apache.org/repos/asf/impala/blob/3855cb0f/common/protobuf/data_stream_service.proto
----------------------------------------------------------------------
diff --git a/common/protobuf/data_stream_service.proto b/common/protobuf/data_stream_service.proto
index c2045d2..854eb87 100644
--- a/common/protobuf/data_stream_service.proto
+++ b/common/protobuf/data_stream_service.proto
@@ -49,6 +49,9 @@ message TransmitDataRequestPB {
 message TransmitDataResponsePB {
   // Status::OK() on success; Error status on failure.
   optional StatusPB status = 1;
+
+  // Latency for response in the receiving daemon in nanoseconds.
+  optional int64 receiver_latency_ns = 2;
 }
 
 // All fields are required in V1.
@@ -66,6 +69,9 @@ message EndDataStreamRequestPB {
 // All fields are required in V1.
 message EndDataStreamResponsePB {
   optional StatusPB status = 1;
+
+  // Latency for response in the receiving daemon in nanoseconds.
+  optional int64 receiver_latency_ns = 2;
 }
 
 // Handles data transmission between fragment instances.


[04/12] impala git commit: IMPALA-5721, IMPALA-6717, IMPALA-6738: improve stress test binary search

Posted by ph...@apache.org.
IMPALA-5721,IMPALA-6717,IMPALA-6738: improve stress test binary search

IMPALA-5721:
- Save profiles of queries at the end of both the spilling and
  non-spilling binary search. These were not being saved before. Note
  these profiles won't have ExecSummary until IMPALA-6640 is addressed.

- Save the profile of any query that produces incorrect results during
  binary search. These were not being saved before, either.

- Use descriptive names, like
  tpch_100_parquet_q12_profile_without_spilling.txt, for profiles
  mentioned above. We do this by introducing the concept of a
  "logical_query_id" whose values look like "tpch_100_parquet_q12".

- Use the logical_query_id in critical error paths and include the
  logical_query_id in result hash files.

IMPALA-6717:
- Plumb --common-query-options through to the binary search.

IMPALA-6738:
- Begin a refactoring to reduce the number of parameters used when doing
  the binary search.

- Introduce a notion of "converted args" via class that does the
  conversion (if needed) via property getters.

- Adjust populate_all_queries() to use converted_args

Change-Id: I33d036ec93df3016cd4703205078dbdba0168acb
Reviewed-on: http://gerrit.cloudera.org:8080/9770
Reviewed-by: David Knupp <dk...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/382d7793
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/382d7793
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/382d7793

Branch: refs/heads/2.x
Commit: 382d7793955aa546082fdf0d7b28e176e22db99f
Parents: ee05cf5
Author: Michael Brown <mi...@cloudera.com>
Authored: Wed Feb 28 11:02:11 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Mar 29 03:46:07 2018 +0000

----------------------------------------------------------------------
 tests/stress/concurrent_select.py | 289 ++++++++++++++++++++++++---------
 1 file changed, 213 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/382d7793/tests/stress/concurrent_select.py
----------------------------------------------------------------------
diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py
index a4bffd9..44d7b34 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -62,17 +62,19 @@ import sys
 import threading
 import traceback
 from Queue import Empty   # Must be before Queue below
+from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, Namespace
 from collections import defaultdict
 from contextlib import contextmanager
 from datetime import datetime
 from multiprocessing import Lock, Process, Queue, Value
-from random import choice, random, randrange
+from random import choice, random, randrange, shuffle
 from sys import exit, maxint
 from tempfile import gettempdir
 from textwrap import dedent
 from threading import current_thread, Thread
 from time import sleep, time
 
+import tests.comparison.cli_options as cli_options
 import tests.util.test_file_parser as test_file_parser
 from tests.comparison.cluster import Timeout
 from tests.comparison.db_types import Int, TinyInt, SmallInt, BigInt
@@ -110,6 +112,83 @@ RESULT_HASHES_DIR = "result_hashes"
 RUNTIME_INFO_FILE_VERSION = 3
 
 
+class StressArgConverter(object):
+  def __init__(self, args):
+    """
+    Convert arguments as returned from from argparse parse_args() into internal forms.
+
+    The purpose of this object is to do any conversions needed from the type given by
+    parge_args() into internal forms. For example, if a commandline option takes in a
+    complicated string that needs to be converted into a list or dictionary, this is the
+    place to do it. Access works the same as on the object returned by parse_args(),
+    i.e., object.option_attribute.
+
+    In most cases, simple arguments needn't be converted, because argparse handles the
+    type conversion already, and in most cases, type conversion (e.g., "8" <str> to 8
+    <int>) is all that's needed. If a property getter below doesn't exist, it means the
+    argument value is just passed along unconverted.
+
+    Params:
+      args: argparse.Namespace object (from argparse.ArgumentParser().parse_args())
+    """
+    assert isinstance(args, Namespace), "expected Namespace, got " + str(type(args))
+    self._args = args
+    self._common_query_options = None
+
+  def __getattr__(self, attr):
+    # This "proxies through" all the attributes from the Namespace object that are not
+    # defined in this object via property getters below.
+    return getattr(self._args, attr)
+
+  @property
+  def common_query_options(self):
+    # Memoize this, as the integrity checking of --common-query-options need only
+    # happen once.
+    if self._common_query_options is not None:
+      return self._common_query_options
+    # The stress test sets these, so callers cannot override them.
+    IGNORE_QUERY_OPTIONS = frozenset([
+        'ABORT_ON_ERROR',
+        'MEM_LIMIT',
+    ])
+    common_query_options = {}
+    if self._args.common_query_options is not None:
+      for query_option_and_value in self._args.common_query_options:
+        try:
+          query_option, value = query_option_and_value.split('=')
+        except ValueError:
+          LOG.error(
+              "Could not parse --common-query-options: '{common_query_options}'".format(
+                  common_query_options=self._args.common_query_options))
+          exit(1)
+        query_option = query_option.upper()
+        if query_option in common_query_options:
+          LOG.error(
+              "Query option '{query_option}' already defined in --common-query-options: "
+              "'{common_query_options}'".format(
+                  query_option=query_option,
+                  common_query_options=self._args.common_query_options))
+          exit(1)
+        elif query_option in IGNORE_QUERY_OPTIONS:
+          LOG.warn(
+              "Ignoring '{query_option}' in common query options: '{opt}': "
+              "The stress test algorithm needs control of this option.".format(
+                  query_option=query_option, opt=self._args.common_query_options))
+        else:
+          common_query_options[query_option] = value
+          LOG.debug("Common query option '{query_option}' set to '{value}'".format(
+              query_option=query_option, value=value))
+    self._common_query_options = common_query_options
+    return self._common_query_options
+
+  @property
+  def runtime_info_path(self):
+    runtime_info_path = self._args.runtime_info_path
+    if "{cm_host}" in runtime_info_path:
+      runtime_info_path = runtime_info_path.format(cm_host=self._args.cm_host)
+    return runtime_info_path
+
+
 def create_and_start_daemon_thread(fn, name):
   thread = Thread(target=fn, name=name)
   thread.error = None
@@ -169,7 +248,9 @@ def print_crash_info_if_exists(impala, start_time):
 class QueryReport(object):
   """Holds information about a single query run."""
 
-  def __init__(self):
+  def __init__(self, query):
+    self.query = query
+
     self.result_hash = None
     self.runtime_secs = None
     self.mem_was_spilled = False
@@ -180,6 +261,32 @@ class QueryReport(object):
     self.profile = None
     self.query_id = None
 
+  def write_query_profile(self, directory, prefix=None):
+    """
+    Write out the query profile bound to this object to a given directory.
+
+    The file name is generated and will contain the query ID. Use the optional prefix
+    parameter to set a prefix on the filename.
+
+    Example return:
+      tpcds_300_decimal_parquet_q21_00000001_a38c8331_profile.txt
+
+    Parameters:
+      directory (str): Directory to write profile.
+      prefix (str): Prefix for filename.
+    """
+    if not (self.profile and self.query_id):
+      return
+    if prefix is not None:
+      file_name = prefix + '_'
+    else:
+      file_name = ''
+    file_name += self.query.logical_query_id + '_'
+    file_name += self.query_id.replace(":", "_") + "_profile.txt"
+    profile_log_path = os.path.join(directory, file_name)
+    with open(profile_log_path, "w") as profile_log:
+      profile_log.write(self.profile)
+
 
 class MemBroker(object):
   """Provides memory usage coordination for clients running in different processes.
@@ -632,14 +739,18 @@ class StressRunner(object):
             continue
           increment(self._num_successive_errors)
           increment(self._num_other_errors)
-          self._write_query_profile(report)
-          raise Exception("Query {0} failed: {1}".format(report.query_id, error_msg))
+          self._write_query_profile(report, PROFILES_DIR, prefix='error')
+          raise Exception("Query {query} ID {id} failed: {mesg}".format(
+              query=query.logical_query_id,
+              id=report.query_id,
+              mesg=error_msg))
         if (
             report.mem_limit_exceeded and
             not self._mem_broker.was_overcommitted(reservation_id)
         ):
           increment(self._num_successive_errors)
-          self._write_query_profile(report)
+          self._write_query_profile(
+              report, PROFILES_DIR, prefix='unexpected_mem_exceeded')
           raise Exception("Unexpected mem limit exceeded; mem was not overcommitted. "
                           "Query ID: {0}".format(report.query_id))
         if (
@@ -649,18 +760,19 @@ class StressRunner(object):
         ):
           increment(self._num_successive_errors)
           increment(self._num_result_mismatches)
-          self._write_query_profile(report)
+          self._write_query_profile(report, PROFILES_DIR, prefix='incorrect_results')
           raise Exception(dedent("""\
                                  Result hash mismatch; expected {expected}, got {actual}
                                  Query ID: {id}
                                  Query: {query}""".format(expected=query.result_hash,
                                                           actual=report.result_hash,
                                                           id=report.query_id,
-                                                          query=query.sql)))
+                                                          query=query.logical_query_id)))
         if report.timed_out and not should_cancel:
-          self._write_query_profile(report)
+          self._write_query_profile(report, PROFILES_DIR, prefix='timed_out')
           raise Exception(
-              "Query unexpectedly timed out. Query ID: {0}".format(report.query_id))
+              "Query {query} unexpectedly timed out. Query ID: {id}".format(
+                  query=query.logical_query_id, id=report.query_id))
         self._num_successive_errors.value = 0
 
   def _print_status_header(self):
@@ -706,13 +818,10 @@ class StressRunner(object):
     if report.timed_out:
       increment(self._num_queries_timedout)
 
-  def _write_query_profile(self, report):
-    if not (report.profile and report.query_id):
-      return
-    file_name = report.query_id.replace(":", "_") + "_profile.txt"
-    profile_log_path = os.path.join(self.results_dir, PROFILES_DIR, file_name)
-    with open(profile_log_path, "w") as profile_log:
-      profile_log.write(report.profile)
+  def _write_query_profile(self, report, subdir, prefix=None):
+    report.write_query_profile(
+        os.path.join(self.results_dir, subdir),
+        prefix)
 
   def _check_successive_errors(self):
     if (self._num_successive_errors.value >= self.num_successive_errors_needed_to_abort):
@@ -807,6 +916,8 @@ class Query(object):
     self.result_hash = None
     self.required_mem_mb_with_spilling = None
     self.required_mem_mb_without_spilling = None
+    self.solo_runtime_profile_with_spilling = None
+    self.solo_runtime_profile_without_spilling = None
     self.solo_runtime_secs_with_spilling = None
     self.solo_runtime_secs_without_spilling = None
     # Query options to set before running the query.
@@ -818,6 +929,8 @@ class Query(object):
     # UPSERT, DELETE.
     self.query_type = QueryType.SELECT
 
+    self._logical_query_id = None
+
   def __repr__(self):
     return dedent("""
         <Query
@@ -832,6 +945,29 @@ class Query(object):
         Population order: %(population_order)r>
         """.strip() % self.__dict__)
 
+  @property
+  def logical_query_id(self):
+    """
+    Return a meanginful unique str identifier for the query.
+
+    Example: "tpcds_300_decimal_parquet_q21"
+    """
+    if self._logical_query_id is None:
+      self._logical_query_id = '{0}_{1}'.format(self.db_name, self.name)
+    return self._logical_query_id
+
+  def write_runtime_info_profiles(self, directory):
+    """Write profiles for spilling and non-spilling into directory (str)."""
+    profiles_to_write = [
+        (self.logical_query_id + "_profile_with_spilling.txt",
+         self.solo_runtime_profile_with_spilling),
+        (self.logical_query_id + "_profile_without_spilling.txt",
+         self.solo_runtime_profile_without_spilling),
+    ]
+    for filename, profile in profiles_to_write:
+      with open(os.path.join(directory, filename), "w") as fh:
+        fh.write(profile)
+
 
 class QueryRunner(object):
   """Encapsulates functionality to run a query and provide a runtime report."""
@@ -856,7 +992,7 @@ class QueryRunner(object):
       self.impalad_conn = None
 
   def run_query(self, query, timeout_secs, mem_limit_mb, run_set_up=False,
-                should_cancel=False):
+                should_cancel=False, retain_profile=False):
     """Run a query and return an execution report. If 'run_set_up' is True, set up sql
     will be executed before the main query. This should be the case during the binary
     search phase of the stress test.
@@ -868,7 +1004,7 @@ class QueryRunner(object):
       raise Exception("connect() must first be called")
 
     timeout_unix_time = time() + timeout_secs
-    report = QueryReport()
+    report = QueryReport(query)
     try:
       with self.impalad_conn.cursor() as cursor:
         start_time = time()
@@ -914,7 +1050,8 @@ class QueryRunner(object):
           if query.query_type == QueryType.SELECT:
             try:
               report.result_hash = self._hash_result(cursor, timeout_unix_time, query)
-              if query.result_hash and report.result_hash != query.result_hash:
+              if retain_profile or \
+                 query.result_hash and report.result_hash != query.result_hash:
                 fetch_and_set_profile(cursor, report)
             except QueryTimeout:
               self._cancel(cursor, report)
@@ -1004,7 +1141,7 @@ class QueryRunner(object):
     def hash_result_impl():
       result_log = None
       try:
-        file_name = query_id.replace(":", "_")
+        file_name = '_'.join([query.logical_query_id, query_id.replace(":", "_")])
         if query.result_hash is None:
           file_name += "_initial"
         file_name += "_results.txt"
@@ -1159,7 +1296,8 @@ def populate_runtime_info_for_random_queries(
 
 def populate_runtime_info(
     query, impala, use_kerberos, results_dir,
-    timeout_secs=maxint, samples=1, max_conflicting_samples=0
+    timeout_secs=maxint, samples=1, max_conflicting_samples=0,
+    common_query_options=None
 ):
   """Runs the given query by itself repeatedly until the minimum memory is determined
   with and without spilling. Potentially all fields in the Query class (except
@@ -1177,6 +1315,7 @@ def populate_runtime_info(
   LOG.info("Collecting runtime info for query %s: \n%s", query.name, query.sql)
   runner = QueryRunner()
   runner.check_if_mem_was_spilled = True
+  runner.common_query_options = common_query_options
   runner.impalad = impala.impalads[0]
   runner.results_dir = results_dir
   runner.use_kerberos = use_kerberos
@@ -1191,6 +1330,8 @@ def populate_runtime_info(
   old_required_mem_mb_without_spilling = query.required_mem_mb_without_spilling
   old_required_mem_mb_with_spilling = query.required_mem_mb_with_spilling
 
+  profile_error_prefix = query.logical_query_id + "_binsearch_error"
+
   # TODO: This method is complicated enough now that breaking it out into a class may be
   # helpful to understand the structure.
 
@@ -1203,30 +1344,45 @@ def populate_runtime_info(
       ):
         query.required_mem_mb_with_spilling = required_mem
         query.solo_runtime_secs_with_spilling = report.runtime_secs
+        query.solo_runtime_profile_with_spilling = report.profile
     elif (
         query.required_mem_mb_without_spilling is None or
         required_mem < query.required_mem_mb_without_spilling
     ):
       query.required_mem_mb_without_spilling = required_mem
       query.solo_runtime_secs_without_spilling = report.runtime_secs
+      query.solo_runtime_profile_without_spilling = report.profile
 
   def get_report(desired_outcome=None):
     reports_by_outcome = defaultdict(list)
     leading_outcome = None
     for remaining_samples in xrange(samples - 1, -1, -1):
-      report = runner.run_query(query, timeout_secs, mem_limit, run_set_up=True)
+      report = runner.run_query(query, timeout_secs, mem_limit,
+                                run_set_up=True, retain_profile=True)
       if report.timed_out:
-        raise QueryTimeout()
+        report.write_query_profile(
+            os.path.join(results_dir, PROFILES_DIR),
+            profile_error_prefix)
+        raise QueryTimeout(
+            "query {0} timed out during binary search".format(query.logical_query_id))
       if report.non_mem_limit_error:
-        raise report.non_mem_limit_error
+        report.write_query_profile(
+            os.path.join(results_dir, PROFILES_DIR),
+            profile_error_prefix)
+        raise Exception(
+            "query {0} errored during binary search: {1}".format(
+                query.logical_query_id, str(report.non_mem_limit_error)))
       LOG.debug("Spilled: %s" % report.mem_was_spilled)
       if not report.mem_limit_exceeded:
         if query.result_hash is None:
           query.result_hash = report.result_hash
         elif query.result_hash != report.result_hash:
+          report.write_query_profile(
+              os.path.join(results_dir, PROFILES_DIR),
+              profile_error_prefix)
           raise Exception(
-              "Result hash mismatch; expected %s, got %s" %
-              (query.result_hash, report.result_hash))
+              "Result hash mismatch for query %s; expected %s, got %s" %
+              (query.logical_query_id, query.result_hash, report.result_hash))
 
       if report.mem_limit_exceeded:
         outcome = "EXCEEDED"
@@ -1332,8 +1488,16 @@ def populate_runtime_info(
       upper_bound = mem_limit
     if should_break:
       if not query.required_mem_mb_with_spilling:
+        if upper_bound - mem_limit < MEM_LIMIT_EQ_THRESHOLD_MB:
+          # IMPALA-6604: A fair amount of queries go down this path.
+          LOG.info(
+              "Unable to find a memory limit with spilling within the threshold of {0} "
+              "MB. Using the same memory limit for both.".format(
+                  MEM_LIMIT_EQ_THRESHOLD_MB))
         query.required_mem_mb_with_spilling = query.required_mem_mb_without_spilling
         query.solo_runtime_secs_with_spilling = query.solo_runtime_secs_without_spilling
+        query.solo_runtime_profile_with_spilling = \
+            query.solo_runtime_profile_without_spilling
       break
   LOG.info("Minimum memory is %s MB" % query.required_mem_mb_with_spilling)
   if (
@@ -1349,6 +1513,7 @@ def populate_runtime_info(
         " the absolute minimum memory.")
     query.required_mem_mb_with_spilling = query.required_mem_mb_without_spilling
     query.solo_runtime_secs_with_spilling = query.solo_runtime_secs_without_spilling
+    query.solo_runtime_profile_with_spilling = query.solo_runtime_profile_without_spilling
   LOG.debug("Query after populating runtime info: %s", query)
 
 
@@ -1420,9 +1585,12 @@ def save_runtime_info(path, query, impala):
     class JsonEncoder(json.JSONEncoder):
       def default(self, obj):
         data = dict(obj.__dict__)
-        # Queries are stored by sql, so remove the duplicate data.
-        if "sql" in data:
-          del data["sql"]
+        # Queries are stored by sql, so remove the duplicate data. Also don't store
+        # profiles as JSON values, but instead separately.
+        for k in ("sql", "solo_runtime_profile_with_spilling",
+                  "solo_runtime_profile_without_spilling"):
+          if k in data:
+            del data[k]
         return data
     json.dump(
         store, file, cls=JsonEncoder, sort_keys=True, indent=2, separators=(',', ': '))
@@ -1690,9 +1858,12 @@ def reset_databases(cursor):
                   " exist in '{1}' database.".format(table_name, cursor.db_name))
 
 
-def populate_all_queries(queries, impala, args, runtime_info_path,
-                         queries_with_runtime_info_by_db_sql_and_options):
+def populate_all_queries(
+    queries, impala, converted_args, queries_with_runtime_info_by_db_sql_and_options
+):
   """Populate runtime info for all queries, ordered by the population_order property."""
+  common_query_options = converted_args.common_query_options
+  runtime_info_path = converted_args.runtime_info_path
   result = []
   queries_by_order = {}
   for query in queries:
@@ -1712,9 +1883,13 @@ def populate_all_queries(queries, impala, args, runtime_info_path,
             query.db_name][query.sql][str(sorted(query.options.items()))])
       else:
         populate_runtime_info(
-            query, impala, args.use_kerberos, args.results_dir,
-            samples=args.samples, max_conflicting_samples=args.max_conflicting_samples)
+            query, impala, converted_args.use_kerberos, converted_args.results_dir,
+            samples=converted_args.samples,
+            max_conflicting_samples=converted_args.max_conflicting_samples,
+            common_query_options=common_query_options)
         save_runtime_info(runtime_info_path, query, impala)
+        query.write_runtime_info_profiles(
+            os.path.join(converted_args.results_dir, PROFILES_DIR))
         result.append(query)
   return result
 
@@ -1745,10 +1920,6 @@ def print_version(cluster):
 
 
 def main():
-  from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
-  from random import shuffle
-  import tests.comparison.cli_options as cli_options
-
   parser = ArgumentParser(
       epilog=dedent("""
       Before running this script a CM cluster must be setup and any needed data
@@ -1891,6 +2062,9 @@ def main():
       "or set valid values. Example: --common-query-options "
       "DISABLE_CODEGEN=true RUNTIME_FILTER_MODE=1")
   args = parser.parse_args()
+  converted_args = StressArgConverter(args)
+  common_query_options = converted_args.common_query_options
+  runtime_info_path = converted_args.runtime_info_path
 
   cli_options.configure_logging(
       args.log_level, debug_log_file=args.debug_log_file, log_thread_name=True,
@@ -1906,40 +2080,6 @@ def main():
         "At least one of --tpcds-db, --tpch-db, --tpch-kudu-db,"
         "--tpcds-kudu-db, --tpch-nested-db, --random-db, --query-file-path is required")
 
-  # The stress test sets these, so callers cannot override them.
-  IGNORE_QUERY_OPTIONS = frozenset([
-      'ABORT_ON_ERROR',
-      'MEM_LIMIT',
-  ])
-
-  common_query_options = {}
-  if args.common_query_options is not None:
-    for query_option_and_value in args.common_query_options:
-      try:
-        query_option, value = query_option_and_value.split('=')
-      except ValueError:
-        LOG.error(
-            "Could not parse --common-query-options: '{common_query_options}'".format(
-                common_query_options=args.common_query_options))
-        exit(1)
-      query_option = query_option.upper()
-      if query_option in common_query_options:
-        LOG.error(
-            "Query option '{query_option}' already defined in --common-query-options: "
-            "'{common_query_options}'".format(
-                query_option=query_option,
-                common_query_options=args.common_query_options))
-        exit(1)
-      elif query_option in IGNORE_QUERY_OPTIONS:
-        LOG.warn(
-            "Ignoring '{query_option}' in common query options: '{opt}': "
-            "The stress test algorithm needs control of this option.".format(
-                query_option=query_option, opt=args.common_query_options))
-      else:
-        common_query_options[query_option] = value
-        LOG.debug("Common query option '{query_option}' set to '{value}'".format(
-            query_option=query_option, value=value))
-
   os.mkdir(os.path.join(args.results_dir, RESULT_HASHES_DIR))
   os.mkdir(os.path.join(args.results_dir, PROFILES_DIR))
 
@@ -1956,9 +2096,6 @@ def main():
     raise Exception("Queries are currently running on the cluster")
   impala.min_impalad_mem_mb = min(impala.find_impalad_mem_mb_limit())
 
-  runtime_info_path = args.runtime_info_path
-  if "{cm_host}" in runtime_info_path:
-    runtime_info_path = runtime_info_path.format(cm_host=args.cm_host)
   queries_with_runtime_info_by_db_sql_and_options = load_runtime_info(
       runtime_info_path, impala)
 
@@ -2026,8 +2163,8 @@ def main():
       with impala.cursor(db_name=database) as cursor:
         reset_databases(cursor)
 
-  queries = populate_all_queries(queries, impala, args, runtime_info_path,
-                                 queries_with_runtime_info_by_db_sql_and_options)
+  queries = populate_all_queries(
+      queries, impala, converted_args, queries_with_runtime_info_by_db_sql_and_options)
 
   # A particular random query may either fail (due to a generator or Impala bug) or
   # take a really long time to complete. So the queries needs to be validated. Since the


[03/12] impala git commit: IMPALA-6759: align stress test memory estimation parse pattern

Posted by ph...@apache.org.
IMPALA-6759: align stress test memory estimation parse pattern

The stress test never expected to see memory estimates on the order of
PB. Apparently it can happen with TPC DS 10000, so update the pattern.

It's not clear how to quickly write a test to catch this, because it
involves crossing language boundaries and possibly having a
massively-scaled dataset. I think leaving a comment in both places is
good enough for now.

Change-Id: I317c271888584ed2a817ee52ad70267eae64d341
Reviewed-on: http://gerrit.cloudera.org:8080/9846
Reviewed-by: Lars Volker <lv...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/ee05cf5d
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/ee05cf5d
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/ee05cf5d

Branch: refs/heads/2.x
Commit: ee05cf5db76006812d4a3c947d39ccc583031ee1
Parents: 009564a
Author: Michael Brown <mi...@cloudera.com>
Authored: Wed Mar 28 15:14:20 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Mar 29 03:46:07 2018 +0000

----------------------------------------------------------------------
 fe/src/main/java/org/apache/impala/common/PrintUtils.java |  2 ++
 tests/stress/concurrent_select.py                         |  8 +++++---
 tests/util/parse_util.py                                  | 10 ++++++----
 3 files changed, 13 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/ee05cf5d/fe/src/main/java/org/apache/impala/common/PrintUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/common/PrintUtils.java b/fe/src/main/java/org/apache/impala/common/PrintUtils.java
index 77d77dd..9f75134 100644
--- a/fe/src/main/java/org/apache/impala/common/PrintUtils.java
+++ b/fe/src/main/java/org/apache/impala/common/PrintUtils.java
@@ -39,6 +39,8 @@ public class PrintUtils {
   public static String printBytes(long bytes) {
     double result = bytes;
     // Avoid String.format() due to IMPALA-1572 which happens on JDK7 but not JDK6.
+    // IMPALA-6759: Please update tests/stress/concurrent_select.py MEM_ESTIMATE_PATTERN
+    // if you add additional unit prefixes.
     if (bytes >= PETABYTE) return new DecimalFormat(".00PB").format(result / PETABYTE);
     if (bytes >= TERABYTE) return new DecimalFormat(".00TB").format(result / TERABYTE);
     if (bytes >= GIGABYTE) return new DecimalFormat(".00GB").format(result / GIGABYTE);

http://git-wip-us.apache.org/repos/asf/impala/blob/ee05cf5d/tests/stress/concurrent_select.py
----------------------------------------------------------------------
diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py
index fa8541c..a4bffd9 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -98,8 +98,10 @@ MEM_LIMIT_EQ_THRESHOLD_PC = 0.975
 MEM_LIMIT_EQ_THRESHOLD_MB = 50
 
 # Regex to extract the estimated memory from an explain plan.
+# The unit prefixes can be found in
+# fe/src/main/java/org/apache/impala/common/PrintUtils.java
 MEM_ESTIMATE_PATTERN = re.compile(
-    r"Per-Host Resource Estimates: Memory=(\d+.?\d*)(T|G|M|K)?B")
+    r"Per-Host Resource Estimates: Memory=(\d+.?\d*)(P|T|G|M|K)?B")
 
 PROFILES_DIR = "profiles"
 RESULT_HASHES_DIR = "result_hashes"
@@ -1359,8 +1361,8 @@ def match_memory_estimate(explain_lines):
     explain_lines: list of str
 
   Returns:
-    2-tuple str of memory limit in decimal string and units (one of 'T', 'G', 'M', 'K',
-    '' bytes)
+    2-tuple str of memory limit in decimal string and units (one of 'P', 'T', 'G', 'M',
+    'K', '' bytes)
 
   Raises:
     Exception if no match found

http://git-wip-us.apache.org/repos/asf/impala/blob/ee05cf5d/tests/util/parse_util.py
----------------------------------------------------------------------
diff --git a/tests/util/parse_util.py b/tests/util/parse_util.py
index ad40b68..6869489 100644
--- a/tests/util/parse_util.py
+++ b/tests/util/parse_util.py
@@ -56,15 +56,17 @@ def parse_mem_to_mb(mem, units):
   if units.endswith("B"):
     units = units[:-1]
   if not units:
-    mem /= 10 ** 6
+    mem /= 2 ** 20
   elif units == "K":
-    mem /= 10 ** 3
+    mem /= 2 ** 10
   elif units == "M":
     pass
   elif units == "G":
-    mem *= 10 ** 3
+    mem *= 2 ** 10
   elif units == "T":
-    mem *= 10 ** 6
+    mem *= 2 ** 20
+  elif units == "P":
+    mem *= 2 ** 30
   else:
     raise Exception('Unexpected memory unit "%s"' % units)
   return int(mem)


[11/12] impala git commit: IMPALA-6760: Fix for py2.7-ism in run-tests.py.

Posted by ph...@apache.org.
IMPALA-6760: Fix for py2.7-ism in run-tests.py.

A set-literal snuck into run-tests.py in a recent
change. We wish to avoid these to be able to run on
py2.6.

Change-Id: I81928d1880a493b91abb13b3a8149568c9789f66
Reviewed-on: http://gerrit.cloudera.org:8080/9843
Reviewed-by: Philip Zeyliger <ph...@cloudera.com>
Tested-by: Philip Zeyliger <ph...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/25b2344f
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/25b2344f
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/25b2344f

Branch: refs/heads/2.x
Commit: 25b2344f9315e74ae8d93bed50c90199945f43b3
Parents: e882cbb
Author: Philip Zeyliger <ph...@cloudera.com>
Authored: Wed Mar 28 15:15:42 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Mar 30 01:48:38 2018 +0000

----------------------------------------------------------------------
 tests/run-tests.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/25b2344f/tests/run-tests.py
----------------------------------------------------------------------
diff --git a/tests/run-tests.py b/tests/run-tests.py
index 6967b17..9552d0d 100755
--- a/tests/run-tests.py
+++ b/tests/run-tests.py
@@ -80,7 +80,7 @@ class TestCounterPlugin(object):
   # https://docs.pytest.org/en/2.9.2/writing_plugins.html#_pytest.hookspec.pytest_collection_modifyitems
   def pytest_collection_modifyitems(self, items):
       for item in items:
-          self.tests_collected.update({item.nodeid})
+          self.tests_collected.add(item.nodeid)
 
   # link to pytest_runtest_logreport
   # https://docs.pytest.org/en/2.9.2/_modules/_pytest/hookspec.html#pytest_runtest_logreport


[06/12] impala git commit: IMPALA-6747: Automate diagnostics collection.

Posted by ph...@apache.org.
IMPALA-6747: Automate diagnostics collection.

This commit adds the necessary tooling to automate diagnostics
collection for Impala daemons. Following diagnostics are supported.

1. Native core dump (+ shared libs)
2. GDB/Java thread dump (pstack + jstack)
3. Java heap dump (jmap)
4. Minidumps (using breakpad) *
5. Profiles

Given the required inputs, the script outputs a zip compressed
impala diagnostic bundle with all the diagnostics collected.

The script can be run manually with the following command.

python collect_diagnostics.py --help

* minidumps collected here correspond to the state of the Impala
process at the time this script is triggered. This is different
from collect_minidumps.py which archives the entire minidump
directory.

Change-Id: Ib29caec7c3be5b6a31e60461294979c318300f64
Reviewed-on: http://gerrit.cloudera.org:8080/9815
Reviewed-by: Lars Volker <lv...@cloudera.com>
Reviewed-by: Philip Zeyliger <ph...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/25126e8e
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/25126e8e
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/25126e8e

Branch: refs/heads/2.x
Commit: 25126e8e4df2ed3ff518f1f7faac9ddd32a7d1de
Parents: 792dcba
Author: Bharath Vissapragada <bh...@cloudera.com>
Authored: Mon Dec 4 13:38:09 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Mar 29 03:46:07 2018 +0000

----------------------------------------------------------------------
 bin/diagnostics/__init__.py            |   0
 bin/diagnostics/collect_diagnostics.py | 518 ++++++++++++++++++++++++++++
 bin/diagnostics/collect_shared_libs.sh |  52 +++
 bin/rat_exclude_files.txt              |   1 +
 tests/unittests/test_command.py        |  49 +++
 5 files changed, 620 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/25126e8e/bin/diagnostics/__init__.py
----------------------------------------------------------------------
diff --git a/bin/diagnostics/__init__.py b/bin/diagnostics/__init__.py
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/impala/blob/25126e8e/bin/diagnostics/collect_diagnostics.py
----------------------------------------------------------------------
diff --git a/bin/diagnostics/collect_diagnostics.py b/bin/diagnostics/collect_diagnostics.py
new file mode 100644
index 0000000..6abc30a
--- /dev/null
+++ b/bin/diagnostics/collect_diagnostics.py
@@ -0,0 +1,518 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import datetime
+import errno
+import getpass
+import glob
+import logging
+import math
+import os
+import shutil
+import subprocess
+import sys
+import time
+import tempfile
+import traceback
+
+from collections import namedtuple
+from struct import Struct
+from threading import Timer
+
+# This script is for automating the collection of following diagnostics from a host
+# running an Impala service daemon (catalogd/statestored/impalad). Following diagnostics
+# are supported.
+#
+# 1. Native core dump (+ shared libs)
+# 2. GDB/Java thread dump (pstack + jstack)
+# 3. Java heap dump (jmap)
+# 4. Minidumps (using breakpad)
+# 5. Profiles
+#
+# Dependencies:
+# 1. gdb package should be installed to collect native thread stacks/coredump. The binary
+#    location is picked up from the system path. In case of pstacks, the script falls back
+#    to the breakpad minidumps if the 'pstack' binary is not in system path.
+# 2. jstack/jmap from a JRE/JDK. Default location is picked up from system path but can be
+#    overriden with --java_home PATH_TO_JAVA_HOME.
+# 3. Mindumps are collected by sending a SIGUSR1 signal to the Impala process. Impala
+#    versions without full breakpad support (<= release 2.6) will reliably crash if
+#    we attempt to do that since those versions do not have the corresponding signal
+#    handler. Hence it is suggested to run this script only on releases 2.7 and later.
+#
+# Usage: python collect_diagnostics.py --help
+#
+# Few example usages:
+#
+# Collect 3 jstacks, pstacks from an impalad process 3s apart.
+#  python collect_diagnostics.py --pid $(pidof impalad) --stacks 3 3
+#
+# Collect core dump and a Java heapdump from the catalogd process
+#  python collect_diagnostics.py --pid $(pidof impalad) --jmap --gcore
+#
+# Collect 5 breakpad minidumps from a statestored process 5s apart.
+#  python collect_diagnostics.py --pid $(pidof statestored) --minidumps 5 5
+#      --minidumps_dir /var/log/statestored/minidumps
+#
+#
+class Command(object):
+  """Wrapper around subprocess.Popen() that is canceled after a configurable timeout."""
+  def __init__(self, cmd, timeout=30):
+    self.cmd = cmd
+    self.timeout = timeout
+    self.child_killed_by_timeout = False
+
+  def run(self, cmd_stdin=None, cmd_stdout=subprocess.PIPE):
+    """Runs the command 'cmd' by setting the appropriate stdin/out. The command is killed
+    if hits a timeout (controlled by self.timeout)."""
+    cmd_string = " ".join(self.cmd)
+    logging.info("Starting command %s with a timeout of %s"
+        % (cmd_string, str(self.timeout)))
+    self.child = subprocess.Popen(self.cmd, stdin=cmd_stdin, stdout=cmd_stdout)
+    timer = Timer(self.timeout, self.kill_child)
+    try:
+      timer.start()
+      # self.stdout is set to None if cmd_stdout is anything other than PIPE. The actual
+      # stdout is written to the file corresponding to cmd_stdout.
+      self.stdout = self.child.communicate()[0]
+      if self.child.returncode == 0:
+        logging.info("Command finished successfully: " + cmd_string)
+      else:
+        cmd_status = "timed out" if self.child_killed_by_timeout else "failed"
+        logging.error("Command %s: %s" % (cmd_status, cmd_string))
+      return self.child.returncode
+    finally:
+      timer.cancel()
+    return -1
+
+  def kill_child(self):
+    """Kills the running command (self.child)."""
+    self.child_killed_by_timeout = True
+    self.child.kill()
+
+class ImpalaDiagnosticsHandler(object):
+  IMPALA_PROCESSES = ["impalad", "catalogd", "statestored"]
+  OUTPUT_DIRS_TO_CREATE = ["stacks", "gcores", "jmaps", "profiles",
+      "shared_libs", "minidumps"]
+  MINIDUMP_HEADER = namedtuple("MDRawHeader", "signature version stream_count \
+      stream_directory_rva checksum time_date_stamp flags")
+
+  def __init__(self, args):
+    """Initializes the state by setting the paths of required executables."""
+    self.args = args
+    if args.pid <= 0:
+      return
+
+    self.script_dir = os.path.dirname(os.path.realpath(sys.argv[0]))
+    # Name of the Impala process for which diagnostics should be collected.
+    self.target_process_name = self.get_target_process_name()
+
+    self.java_home = self.get_java_home_from_env()
+    if not self.java_home and args.java_home:
+      self.java_home = os.path.abspath(args.java_home)
+    self.jstack_cmd = os.path.join(self.java_home, "bin/jstack")
+    self.java_cmd = os.path.join(self.java_home, "bin/java")
+    self.jmap_cmd = os.path.join(self.java_home, "bin/jmap")
+
+    self.gdb_cmd = self.get_command_from_path("gdb")
+    self.gcore_cmd = self.get_command_from_path("gcore")
+    self.pstack_cmd = self.get_command_from_path("pstack")
+
+  def create_output_dir_structure(self):
+    """Creates the skeleton directory structure for the diagnostics output collection."""
+    self.collection_root_dir = tempfile.mkdtemp(prefix="impala-diagnostics-%s" %
+        datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S-"),
+        dir=os.path.abspath(self.args.output_dir))
+    for dirname in self.OUTPUT_DIRS_TO_CREATE:
+      os.mkdir(os.path.join(self.collection_root_dir, dirname))
+
+  def get_command_from_path(self, cmd):
+    """Returns the path to a given command executable, if one exists in the
+    system PATH."""
+    for path in os.environ["PATH"].split(os.pathsep):
+      cmd_path = os.path.join(path, cmd)
+      if os.access(cmd_path, os.X_OK):
+        return cmd_path
+    return ""
+
+  def get_target_process_name(self):
+    """Returns the process name of the target process for which diagnostics
+    should be collected."""
+    try:
+      return open("/proc/%s/comm" % self.args.pid).read().strip()
+    except Exception:
+      logging.exception("Failed to get target process name.")
+      return ""
+
+  def get_num_child_proc(self, name):
+    """Returns number of processes with the given name and target Impala pid
+    as parent."""
+    cmd = Command(["pgrep", "-c", "-P", str(self.args.pid), name])
+    cmd.run()
+    return int(cmd.stdout.strip())
+
+  def get_java_home_from_env(self):
+    """Returns JAVA_HOME set in the env of the target process."""
+    try:
+      envs = open("/proc/%s/environ" % self.args.pid).read().split("\0")
+      for s in envs:
+        k, v = s.split("=", 1)
+        if k == "JAVA_HOME":
+          return v
+    except Exception:
+      logging.exception("Failed to determine JAVA_HOME from proc env.")
+      return ""
+
+  def get_free_disk_space_gbs(self, path):
+    """Returns free disk space (in GBs) of the partition hosting the given path."""
+    s = os.statvfs(path)
+    return (s.f_bsize * s.f_bavail)/(1024.0 * 1024.0 * 1024.0)
+
+  def get_minidump_create_timestamp(self, minidump_path):
+    """Returns the unix timestamp of the minidump create time. It is extracted from
+    the minidump header."""
+    # Read the minidump's header to extract the create time stamp. More information about
+    # the mindump header format can be found here: https://goo.gl/uxKZVe
+    #
+    # typedef struct {
+    #   uint32_t  signature;
+    #   uint32_t  version;
+    #   uint32_t  stream_count;
+    #   MDRVA     stream_directory_rva;  /* A |stream_count|-sized array of
+    #                                     * MDRawDirectory structures. */
+    #   uint32_t  checksum;              /* Can be 0.  In fact, that's all that's
+    #                                     * been found in minidump files. */
+    #   uint32_t  time_date_stamp;       /* time_t */
+    #   uint64_t  flags;
+    # } MDRawHeader;  /* MINIDUMP_HEADER */
+    s = Struct("IIIiIIQ")
+    data = open(minidump_path, "rb").read(s.size)
+    header = self.MINIDUMP_HEADER(*s.unpack_from(data))
+    return header.time_date_stamp
+
+  def wait_for_minidump(self):
+    """Minidump collection is async after sending the SIGUSR1 signal. So this method
+    waits till it is written to the disk. Since minidump forks off a new process from
+    the parent Impala process we need to wait till the forked process exits.
+    Returns after 30s to prevent infinite waiting. Should be called after sending the
+    SIGUSR1 signal to the Impala process."""
+    MAX_WAIT_TIME_S = 30
+    start_time = time.time()
+    while time.time() < start_time + MAX_WAIT_TIME_S:
+      # Sleep for a bit to ensure that the process fork to write minidump has started.
+      # Otherwise the subsequent check on the process count could pass even when the
+      # fork didn't succeed. This sleep reduces the likelihood of such race.
+      time.sleep(1)
+      if self.get_num_child_proc(self.target_process_name) == 1:
+        break
+    return
+
+  def validate_args(self):
+    """Returns True if self.args are valid, false otherwise"""
+    if self.args.pid <= 0:
+      logging.critical("Invalid PID provided.")
+      return False
+
+    if self.target_process_name not in self.IMPALA_PROCESSES:
+      logging.critical("No valid Impala process with the given PID %s" % str(self.args.pid))
+      return False
+
+    if not self.java_home:
+      logging.critical("JAVA_HOME could not be inferred from process env.\
+          Please specify --java_home.")
+      return False
+
+    if self.args.jmap and not os.path.exists(self.jmap_cmd):
+      logging.critical("jmap binary not found, required to collect a Java heap dump.")
+      return False
+
+    if self.args.gcore and not os.path.exists(self.gcore_cmd):
+      logging.critical("gcore binary not found, required to collect a core dump.")
+      return False
+
+    if self.args.profiles_dir and not os.path.isdir(self.args.profiles_dir):
+      logging.critical("No valid profiles directory at path: %s" % self.args.profiles_dir)
+      return False
+
+    return True
+
+  def collect_thread_stacks(self):
+    """Collects jstack/jstack-m/pstack for the given pid in that order. pstack collection
+    falls back to minidumps if pstack binary is missing from the system path. Minidumps
+    are collected by sending a SIGUSR1 to the Impala process and then archiving the
+    contents of the minidump directory. The number of times stacks are collected and the
+    sleep time between the collections are controlled by --stacks argument."""
+    stacks_count, stacks_interval_secs = self.args.stacks
+    if stacks_count <= 0 or stacks_interval_secs < 0:
+      return
+
+    # Skip jstack collection if the jstack binary does not exist.
+    skip_jstacks = not os.path.exists(self.jstack_cmd)
+    if skip_jstacks:
+      logging.info("Skipping jstack collection since jstack binary couldn't be located.")
+
+    # Fallback to breakpad minidump collection if pstack binaries are missing.
+    fallback_to_minidump = False
+    if not self.pstack_cmd:
+      # Fall back to collecting a minidump if pstack is not installed.
+      if not os.path.exists(self.args.minidumps_dir):
+        logging.info("Skipping pstacks since pstack binary couldn't be located. Provide "
+            + "--minidumps_dir for collecting minidumps instead.")
+        # At this point, we can't proceed since we have nothing to collect.
+        if skip_jstacks:
+          return
+      else:
+        fallback_to_minidump = True;
+        logging.info("Collecting breakpad minidumps since pstack/gdb binaries are " +
+            "missing.")
+
+    stacks_dir = os.path.join(self.collection_root_dir, "stacks")
+    # Populate the commands to run in 'cmds_to_run' depending on what kinds of thread
+    # stacks to collect. Each entry is a tuple of form
+    # (Command, stdout_prefix, is_minidump). 'is_minidump' tells whether the command
+    # is trying to trigger a minidump collection.
+    cmds_to_run = []
+    if not skip_jstacks:
+      cmd_args = [self.jstack_cmd, str(self.args.pid)]
+      cmds_to_run.append((Command(cmd_args, self.args.timeout), "jstack", False))
+      # Collect mixed-mode jstack, contains native stack frames.
+      cmd_args_mixed_mode = [self.jstack_cmd, "-m", str(self.args.pid)]
+      cmds_to_run.append(
+          (Command(cmd_args_mixed_mode, self.args.timeout), "jstack-m", False))
+
+    if fallback_to_minidump:
+      cmd_args = ["kill", "-SIGUSR1", str(self.args.pid)]
+      cmds_to_run.append((Command(cmd_args, self.args.timeout), None, True))
+    elif self.pstack_cmd:
+      cmd_args = [self.pstack_cmd, str(self.args.pid)]
+      cmds_to_run.append((Command(cmd_args, self.args.timeout), "pstack", False))
+
+    collection_start_ts = time.time()
+    for i in xrange(stacks_count):
+      for cmd, file_prefix, is_minidump in cmds_to_run:
+        if file_prefix:
+          stdout_file = os.path.join(stacks_dir, file_prefix + "-" + str(i) + ".txt")
+          with open(stdout_file, "w") as output:
+            cmd.run(cmd_stdout=output)
+        else:
+          cmd.run()
+          # Incase of minidump collection, wait for it to be written.
+          if is_minidump:
+            self.wait_for_minidump()
+      time.sleep(stacks_interval_secs)
+
+    # Copy minidumps if required.
+    if fallback_to_minidump:
+      minidump_out_dir =  os.path.join(self.collection_root_dir, "minidumps")
+      self.copy_minidumps(minidump_out_dir, collection_start_ts);
+
+  def collect_minidumps(self):
+    """Collects minidumps on the Impala process based on argument --minidumps. The
+    minidumps are collected by sending a SIGUSR1 signal to the Impala process and then
+    the resulting minidumps are copied to the target directory."""
+    minidump_count, minidump_interval_secs = self.args.minidumps
+    if minidump_count <= 0 or minidump_interval_secs < 0:
+      return
+    # Impala process writes a minidump when it encounters a SIGUSR1.
+    cmd_args = ["kill", "-SIGUSR1", str(self.args.pid)]
+    cmd = Command(cmd_args, self.args.timeout)
+    collection_start_ts = time.time()
+    for i in xrange(minidump_count):
+      cmd.run()
+      self.wait_for_minidump()
+      time.sleep(minidump_interval_secs)
+    out_dir = os.path.join(self.collection_root_dir, "minidumps")
+    self.copy_minidumps(out_dir, collection_start_ts);
+
+  def copy_minidumps(self, target, start_ts):
+    """Copies mindumps with create time >= start_ts to 'target' directory."""
+    logging.info("Copying minidumps from %s to %s with ctime >= %s"
+        % (self.args.minidumps_dir, target, start_ts))
+    for filename in glob.glob(os.path.join(self.args.minidumps_dir, "*.dmp")):
+      try:
+        minidump_ctime = self.get_minidump_create_timestamp(filename)
+        if minidump_ctime >= math.floor(start_ts):
+          shutil.copy2(filename, target)
+        else:
+          logging.info("Ignored mindump: %s ctime: %s" % (filename, minidump_ctime))
+      except Exception:
+        logging.exception("Error processing minidump at path: %s. Skipping it." % filename)
+
+  def collect_java_heapdump(self):
+    """Generates the Java heap dump of the Impala process using the 'jmap' command."""
+    if not self.args.jmap:
+      return
+    jmap_dir = os.path.join(self.collection_root_dir, "jmaps")
+    out_file = os.path.join(jmap_dir, self.target_process_name + "_heap.bin")
+    # jmap command requires it to be run as the process owner.
+    # Command: jmap -dump:format=b,file=<outfile> <pid>
+    cmd_args = [self.jmap_cmd, "-dump:format=b,file=" + out_file, str(self.args.pid)]
+    Command(cmd_args, self.args.timeout).run()
+
+  def collect_native_coredump(self):
+    """Generates the core dump of the Impala process using the 'gcore' command"""
+    if not self.args.gcore:
+      return
+    # Command: gcore -o <outfile> <pid>
+    gcore_dir = os.path.join(self.collection_root_dir, "gcores")
+    out_file_name = self.target_process_name + "-" +\
+        datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S") + ".core"
+    out_file = os.path.join(gcore_dir, out_file_name)
+    cmd_args = [self.gcore_cmd, "-o", out_file, str(self.args.pid)]
+    Command(cmd_args, self.args.timeout).run()
+
+  def collect_query_profiles(self):
+    """Collects Impala query profiles from --profiles_dir. Enforces an uncompressed limit
+    of --profiles_max_size_limit bytes on the copied profile logs."""
+    if not self.args.profiles_dir:
+      return
+    out_dir = os.path.join(self.collection_root_dir, "profiles")
+    # Hardcoded in Impala
+    PROFILE_LOG_FILE_PATTERN = "impala_profile_log_1.1-*";
+    logging.info("Collecting profile data, limiting size to %f GB" %
+        (self.args.profiles_max_size_limit/(1024 * 1024 * 1024)))
+
+    profiles_path = os.path.join(self.args.profiles_dir, PROFILE_LOG_FILE_PATTERN)
+    # Sort the profiles by creation time and copy the most recent ones in that order.
+    sorted_profiles =\
+        sorted(glob.iglob(profiles_path), key=os.path.getctime, reverse=True)
+    profile_size_included_so_far = 0
+    for profile_path in sorted_profiles:
+      try:
+        file_size = os.path.getsize(profile_path)
+        if profile_size_included_so_far + file_size > self.args.profiles_max_size_limit:
+          # Copying the whole file violates profiles_max_size_limit. Copy a part of it.
+          # Profile logs are newline delimited with a single profile per line.
+          num_bytes_to_copy =\
+              self.args.profiles_max_size_limit - profile_size_included_so_far
+          file_name = os.path.basename(profile_path)
+          copied_bytes = 0
+          with open(profile_path, "rb") as in_file,\
+              open(os.path.join(out_dir, file_name), "wb") as out_file:
+            for line in in_file.readlines():
+              if copied_bytes + len(line) > num_bytes_to_copy:
+                break
+              out_file.write(line)
+              copied_bytes += len(line)
+          return
+        profile_size_included_so_far += file_size
+        shutil.copy2(profile_path, out_dir)
+      except:
+        logging.exception("Encountered an error while collecting profile %s. Skipping it."
+            % profile_path)
+
+  def collect_shared_libs(self):
+    """Collects shared libraries loaded by the target Impala process."""
+    # Shared libs are collected if either of core dump or minidumps are enabled.
+    if not (self.args.gcore or self.args.minidumps_dir):
+      return
+    out_dir = os.path.join(self.collection_root_dir, "shared_libs")
+
+    script_path = os.path.join(self.script_dir, "collect_shared_libs.sh")
+    cmd_args = [script_path, self.gdb_cmd, str(self.args.pid), out_dir]
+    Command(cmd_args, self.args.timeout).run()
+
+  def cleanup(self):
+    """Cleans up the directory to which diagnostics were written."""
+    shutil.rmtree(self.collection_root_dir, ignore_errors=True)
+
+  def get_diagnostics(self):
+    """Calls all collect_*() methods to collect diagnostics. Returns True if no errors
+    were encountered during diagnostics collection, False otherwise."""
+    if not self.validate_args():
+      return False
+    logging.info("Using JAVA_HOME: %s" % self.java_home)
+    self.create_output_dir_structure()
+    logging.info("Free disk space: %.2fGB" %
+        self.get_free_disk_space_gbs(self.collection_root_dir))
+    os.chdir(self.args.output_dir)
+    collection_methods = [self.collect_shared_libs, self.collect_query_profiles,
+        self.collect_native_coredump, self.collect_java_heapdump, self.collect_minidumps,
+        self.collect_thread_stacks]
+    exception_encountered = False
+    for method in collection_methods:
+      try:
+        method()
+      except IOError as e:
+        if e.errno == errno.ENOSPC:
+          # Clean up and abort if we are low on disk space. Other IOErrors are logged and
+          # ignored.
+          logging.exception("Disk space low, aborting.")
+          self.cleanup()
+          return False
+        logging.exception("Encountered an IOError calling: %s" % method.__name__)
+        exception_encountered = True
+      except Exception:
+        exception_encountered = True
+        logging.exception("Encountered an exception calling: %s" % method.__name__)
+    if exception_encountered:
+      logging.error("Encountered an exception collecting diagnostics. Final output " +
+          "could be partial.\n")
+    # Archive the directory, even if it is partial.
+    archive_path = self.collection_root_dir + ".tar.gz"
+    logging.info("Archiving diagnostics to path: %s" % archive_path)
+    shutil.make_archive(self.collection_root_dir, "gztar", self.collection_root_dir)
+    self.cleanup()
+    logging.info("Diagnostics collected at path: %s" % archive_path)
+    return not exception_encountered
+
+def get_args_parser():
+  """Creates the argument parser and adds the flags"""
+  parser = argparse.ArgumentParser(description="Impala diagnostics collection")
+  parser.add_argument("--pid", action="store", dest="pid", type=int, default=0,
+      help="PID of the Impala process for which diagnostics should be collected.")
+  parser.add_argument("--java_home", action="store", dest="java_home", default="",
+      help="If not set, it is set to the JAVA_HOME from the pid's environment.")
+  parser.add_argument("--timeout", action="store", dest="timeout", default=300,
+      type=int, help="Timeout (in seconds) for each of the diagnostics commands")
+  parser.add_argument("--stacks", action="store", dest="stacks", nargs=2, type=int,
+      default=[0, 0], metavar=("COUNT", "INTERVAL (in seconds)"),
+      help="Collect jstack, mixed-mode jstack and pstacks of the Impala process.\
+      Breakpad minidumps are collected in case of missing pstack binaries.")
+  parser.add_argument("--jmap", action="store_true", dest="jmap", default=False,
+      help="Collect heap dump of the Java process")
+  parser.add_argument("--gcore", action="store_true", dest="gcore", default=False,
+      help="Collect the native core dump using gdb. Requires gdb to be installed.")
+  parser.add_argument("--minidumps", action="store", dest="minidumps", type=int,
+      nargs=2, default=[0, 0], metavar=("COUNT", "INTERVAL (in seconds)"),
+      help="Collect breakpad minidumps for the Impala process. Requires --minidumps_dir\
+      be set.")
+  parser.add_argument("--minidumps_dir", action="store", dest="minidumps_dir", default="",
+      help="Path of the directory to which Impala process' minidumps are written")
+  parser.add_argument("--profiles_dir", action="store", dest="profiles_dir", default="",
+      help="Path of the profiles directory to be included in the diagnostics output.")
+  parser.add_argument("--profiles_max_size_limit", action="store",
+      dest="profiles_max_size_limit", default=3*1024*1024*1024,
+      type=float, help="Uncompressed limit (in Bytes) on profile logs collected from\
+      --profiles_dir. Defaults to 3GB.")
+  parser.add_argument("--output_dir", action="store", dest="output_dir",
+      default = tempfile.gettempdir(), help="Output directory that contains the final "
+      "diagnostics data. Defaults to %s" % tempfile.gettempdir())
+  return parser
+
+if __name__ == "__main__":
+  parser = get_args_parser()
+  if len(sys.argv) == 1:
+    parser.print_usage()
+    sys.exit(1)
+  logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, datefmt="%Y-%m-%d %H:%M:%S",
+      format="%(asctime)s %(levelname)-8s %(message)s")
+  diagnostics_handler = ImpalaDiagnosticsHandler(parser.parse_args())
+  logging.info("Running as user: %s" % getpass.getuser())
+  logging.info("Input args: %s" % " ".join(sys.argv))
+  sys.exit(0 if diagnostics_handler.get_diagnostics() else 1)

http://git-wip-us.apache.org/repos/asf/impala/blob/25126e8e/bin/diagnostics/collect_shared_libs.sh
----------------------------------------------------------------------
diff --git a/bin/diagnostics/collect_shared_libs.sh b/bin/diagnostics/collect_shared_libs.sh
new file mode 100755
index 0000000..d5de349
--- /dev/null
+++ b/bin/diagnostics/collect_shared_libs.sh
@@ -0,0 +1,52 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# $1 - gdb binary path
+# $2 - pid of the Impala process
+# $3 - Output directory to copy the sharedlibs to.
+
+set -euxo pipefail
+
+if [ "$#" -ne 3 ]; then
+  echo "Incorrect usage. Expected: $0 <gdb executable path> <target PID> <output dir>"
+  exit 1
+fi
+
+if [ ! -d $3 ]; then
+  echo "Directory $3 does not exist. This script expects the output directory to exist."
+  exit 1
+fi
+
+# Generate the list of shared libs path to copy.
+shared_libs_to_copy=$(mktemp)
+$1 --pid $2 --batch -ex 'info shared' 2> /dev/null | sed '1,/Shared Object Library/d' |
+    sed 's/\(.*\s\)\(\/.*\)/\2/' | grep \/ > $shared_libs_to_copy
+
+echo "Generated shared library listing for the process."
+
+# Copy the files to the target directory keeping the directory structure intact.
+# We use rsync instead of 'cp --parents' since the latter has permission issues
+# copying from system level directories. https://goo.gl/6yYNhw
+rsync -LR --files-from=$shared_libs_to_copy / $3
+
+echo "Copied the shared libraries to the target directory: $3"
+
+rm -f $shared_libs_to_copy
+# Make sure the impala user has write permissions on all the copied sharedlib paths.
+chmod 755 -R $3

http://git-wip-us.apache.org/repos/asf/impala/blob/25126e8e/bin/rat_exclude_files.txt
----------------------------------------------------------------------
diff --git a/bin/rat_exclude_files.txt b/bin/rat_exclude_files.txt
index 5bb13f0..66a699a 100644
--- a/bin/rat_exclude_files.txt
+++ b/bin/rat_exclude_files.txt
@@ -17,6 +17,7 @@ shell/__init__.py
 ssh_keys/id_rsa_impala
 testdata/__init__.py
 tests/__init__.py
+bin/diagnostics/__init__.py
 www/index.html
 
 # See $IMPALA_HOME/LICENSE.txt

http://git-wip-us.apache.org/repos/asf/impala/blob/25126e8e/tests/unittests/test_command.py
----------------------------------------------------------------------
diff --git a/tests/unittests/test_command.py b/tests/unittests/test_command.py
new file mode 100644
index 0000000..a2a9e4c
--- /dev/null
+++ b/tests/unittests/test_command.py
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# Unit tests for collect_diagnostics.Command
+
+import os
+import pytest
+import sys
+
+# Update the sys.path to include the modules from bin/diagnostics.
+sys.path.insert(0,
+    os.path.abspath(os.path.join(os.path.dirname(__file__), '../../bin/diagnostics')))
+from collect_diagnostics import Command
+
+class TestCommand(object):
+  """ Unit tests for the Command class"""
+
+  def test_simple_commands(self):
+    # Successful command
+    c = Command(["echo", "foo"], 1000)
+    assert c.run() == 0, "Command expected to succeed, but failed"
+    assert c.stdout.strip("\n") == "foo"
+
+    # Failed command, check return code
+    c = Command(["false"], 1000)
+    assert c.run() == 1
+
+  def test_command_timer(self):
+    # Try to run a command that sleeps for 1000s and set a
+    # timer for 1 second. The command should timed out.
+    c = Command(["sleep", "1000"], 1)
+    assert c.run() != 0, "Command expected to timeout but succeeded."
+    assert c.child_killed_by_timeout, "Command didn't timeout as expected."
+
+


[05/12] impala git commit: KUDU-2385: Fix typo in KinitContext::DoRenewal()

Posted by ph...@apache.org.
KUDU-2385: Fix typo in KinitContext::DoRenewal()

On platforms without krb5_get_init_creds_opt_set_out_ccache(),
krb5_cc_store_cred() is called to insert the newly acquired
credential into the ccache. However, there was a typo in the code
which resulted in inserting the old credential into ccache.
This change fixes the typo to make sure the new credential is
inserted into ccache.

Testing done: confirmed on SLES11 that the new credential
is being inserted by checking the 'auth time' of the ticket
in ccache. Impala uses a slightly different #ifdef which
explicitly checks if krb5_get_init_creds_opt_set_out_ccache()
is defined on the platform so this code path is actually
used when running Impala on SLES11.

Change-Id: I3a22b8d41d15eb1982a3fd5b96575e28edaad31c
Reviewed-on: http://gerrit.cloudera.org:8080/9840
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Todd Lipcon <to...@apache.org>
Reviewed-on: http://gerrit.cloudera.org:8080/9842
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/009564ab
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/009564ab
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/009564ab

Branch: refs/heads/2.x
Commit: 009564ab6d30d681c3cc199785aa6f1642b54f4d
Parents: 25126e8
Author: Michael Ho <kw...@cloudera.com>
Authored: Wed Mar 28 10:53:24 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Mar 29 03:46:07 2018 +0000

----------------------------------------------------------------------
 be/src/kudu/security/init.cc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/009564ab/be/src/kudu/security/init.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/init.cc b/be/src/kudu/security/init.cc
index 7674c7e..340ba75 100644
--- a/be/src/kudu/security/init.cc
+++ b/be/src/kudu/security/init.cc
@@ -299,7 +299,7 @@ Status KinitContext::DoRenewal() {
       KRB5_RETURN_NOT_OK_PREPEND(krb5_cc_initialize(g_krb5_ctx, ccache_, principal_),
                                  "Reacquire error: could not init ccache");
 
-      KRB5_RETURN_NOT_OK_PREPEND(krb5_cc_store_cred(g_krb5_ctx, ccache_, &creds),
+      KRB5_RETURN_NOT_OK_PREPEND(krb5_cc_store_cred(g_krb5_ctx, ccache_, &new_creds),
                                  "Reacquire error: could not store creds in cache");
 #endif
     }


[08/12] impala git commit: IMPALA-5384, part 1: introduce DmlExecState

Posted by ph...@apache.org.
IMPALA-5384, part 1: introduce DmlExecState

This change is based on a patch by Marcel Kornacker.

Move data structures that collect DML operation stats from the
RuntimeState and Coordinator into a new InsertExecState class, which
has it's own lock.  This removes a dependency on the coordinator's
lock, which will allow further coordinator locking cleanup in the next
patch.

Change-Id: Id4c025917620a7bff2acbeb46464f107ab4b7565
Reviewed-on: http://gerrit.cloudera.org:8080/9793
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e882cbb9
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e882cbb9
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e882cbb9

Branch: refs/heads/2.x
Commit: e882cbb999fb6cb7722d0523b601a1d2026bf3bd
Parents: d3617bc
Author: Dan Hecht <dh...@cloudera.com>
Authored: Fri Mar 23 16:28:27 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Mar 29 07:17:25 2018 +0000

----------------------------------------------------------------------
 be/src/benchmarks/expr-benchmark.cc         |   1 +
 be/src/exec/catalog-op-executor.cc          |   1 +
 be/src/exec/data-sink.cc                    |  63 ---
 be/src/exec/data-sink.h                     |   9 -
 be/src/exec/hbase-table-sink.cc             |  11 +-
 be/src/exec/hdfs-table-sink.cc              |  38 +-
 be/src/exec/kudu-table-sink.cc              |  22 +-
 be/src/exec/plan-root-sink.cc               |   1 +
 be/src/runtime/CMakeLists.txt               |   1 +
 be/src/runtime/coordinator-backend-state.cc |  11 +-
 be/src/runtime/coordinator-backend-state.h  |  18 +-
 be/src/runtime/coordinator.cc               | 361 ++---------------
 be/src/runtime/coordinator.h                |  84 +---
 be/src/runtime/dml-exec-state.cc            | 494 +++++++++++++++++++++++
 be/src/runtime/dml-exec-state.h             | 149 +++++++
 be/src/runtime/query-state.cc               |  13 +-
 be/src/runtime/runtime-filter-bank.cc       |   1 +
 be/src/runtime/runtime-state.h              |  29 +-
 be/src/service/client-request-state.cc      |  10 +-
 be/src/service/client-request-state.h       |   1 +
 be/src/service/impala-beeswax-server.cc     |  18 +-
 be/src/service/impala-hs2-server.cc         |   1 +
 be/src/service/impala-http-handler.cc       |   2 +
 be/src/service/impala-server.cc             |   2 +
 be/src/service/impala-server.h              |   3 -
 be/src/testutil/in-process-servers.cc       |   1 +
 26 files changed, 750 insertions(+), 595 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/e882cbb9/be/src/benchmarks/expr-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/expr-benchmark.cc b/be/src/benchmarks/expr-benchmark.cc
index b10a70f..1b995a5 100644
--- a/be/src/benchmarks/expr-benchmark.cc
+++ b/be/src/benchmarks/expr-benchmark.cc
@@ -53,6 +53,7 @@
 #include "runtime/mem-pool.h"
 #include "runtime/mem-tracker.h"
 #include "service/fe-support.h"
+#include "service/frontend.h"
 #include "service/impala-server.h"
 
 #include "common/names.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/e882cbb9/be/src/exec/catalog-op-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/catalog-op-executor.cc b/be/src/exec/catalog-op-executor.cc
index 12398cf..164187c 100644
--- a/be/src/exec/catalog-op-executor.cc
+++ b/be/src/exec/catalog-op-executor.cc
@@ -25,6 +25,7 @@
 #include "runtime/lib-cache.h"
 #include "runtime/client-cache-types.h"
 #include "runtime/exec-env.h"
+#include "service/frontend.h"
 #include "service/impala-server.h"
 #include "service/hs2-util.h"
 #include "util/string-parser.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/e882cbb9/be/src/exec/data-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index f8f068e..9140b3e 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -121,69 +121,6 @@ Status DataSink::Init(const vector<TExpr>& thrift_output_exprs,
   return ScalarExpr::Create(thrift_output_exprs, *row_desc_, state, &output_exprs_);
 }
 
-void DataSink::MergeDmlStats(const TInsertStats& src_stats,
-    TInsertStats* dst_stats) {
-  dst_stats->bytes_written += src_stats.bytes_written;
-  if (src_stats.__isset.kudu_stats) {
-    if (!dst_stats->__isset.kudu_stats) dst_stats->__set_kudu_stats(TKuduDmlStats());
-    if (!dst_stats->kudu_stats.__isset.num_row_errors) {
-      dst_stats->kudu_stats.__set_num_row_errors(0);
-    }
-    dst_stats->kudu_stats.__set_num_row_errors(
-        dst_stats->kudu_stats.num_row_errors + src_stats.kudu_stats.num_row_errors);
-  }
-  if (src_stats.__isset.parquet_stats) {
-    if (dst_stats->__isset.parquet_stats) {
-      MergeMapValues<string, int64_t>(src_stats.parquet_stats.per_column_size,
-          &dst_stats->parquet_stats.per_column_size);
-    } else {
-      dst_stats->__set_parquet_stats(src_stats.parquet_stats);
-    }
-  }
-}
-
-string DataSink::OutputDmlStats(const PartitionStatusMap& stats,
-    const string& prefix) {
-  const char* indent = "  ";
-  stringstream ss;
-  ss << prefix;
-  bool first = true;
-  for (const PartitionStatusMap::value_type& val: stats) {
-    if (!first) ss << endl;
-    first = false;
-    ss << "Partition: ";
-
-    const string& partition_key = val.first;
-    if (partition_key == g_ImpalaInternalService_constants.ROOT_PARTITION_KEY) {
-      ss << "Default" << endl;
-    } else {
-      ss << partition_key << endl;
-    }
-    if (val.second.__isset.num_modified_rows) {
-      ss << "NumModifiedRows: " << val.second.num_modified_rows << endl;
-    }
-
-    if (!val.second.__isset.stats) continue;
-    const TInsertStats& stats = val.second.stats;
-    if (stats.__isset.kudu_stats) {
-      ss << "NumRowErrors: " << stats.kudu_stats.num_row_errors << endl;
-    }
-
-    ss << indent << "BytesWritten: "
-       << PrettyPrinter::Print(stats.bytes_written, TUnit::BYTES);
-    if (stats.__isset.parquet_stats) {
-      const TParquetInsertStats& parquet_stats = stats.parquet_stats;
-      ss << endl << indent << "Per Column Sizes:";
-      for (map<string, int64_t>::const_iterator i = parquet_stats.per_column_size.begin();
-           i != parquet_stats.per_column_size.end(); ++i) {
-        ss << endl << indent << indent << i->first << ": "
-           << PrettyPrinter::Print(i->second, TUnit::BYTES);
-      }
-    }
-  }
-  return ss.str();
-}
-
 Status DataSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
   DCHECK(parent_mem_tracker != nullptr);
   DCHECK(profile_ != nullptr);

http://git-wip-us.apache.org/repos/asf/impala/blob/e882cbb9/be/src/exec/data-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.h b/be/src/exec/data-sink.h
index d2e80af..605b46d 100644
--- a/be/src/exec/data-sink.h
+++ b/be/src/exec/data-sink.h
@@ -85,15 +85,6 @@ class DataSink {
       const TPlanFragmentInstanceCtx& fragment_instance_ctx,
       const RowDescriptor* row_desc, RuntimeState* state, DataSink** sink);
 
-  /// Merges one update to the DML stats for a partition. dst_stats will have the
-  /// combined stats of src_stats and dst_stats after this method returns.
-  static void MergeDmlStats(const TInsertStats& src_stats,
-      TInsertStats* dst_stats);
-
-  /// Outputs the DML stats contained in the map of partition updates to a string
-  static std::string OutputDmlStats(const PartitionStatusMap& stats,
-      const std::string& prefix = "");
-
   MemTracker* mem_tracker() const { return mem_tracker_.get(); }
   RuntimeProfile* profile() const { return profile_; }
   const std::vector<ScalarExprEvaluator*>& output_expr_evals() const {

http://git-wip-us.apache.org/repos/asf/impala/blob/e882cbb9/be/src/exec/hbase-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-table-sink.cc b/be/src/exec/hbase-table-sink.cc
index 9e591ac..567e8bd 100644
--- a/be/src/exec/hbase-table-sink.cc
+++ b/be/src/exec/hbase-table-sink.cc
@@ -59,12 +59,7 @@ Status HBaseTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_track
   RETURN_IF_ERROR(hbase_table_writer_->Init(state));
 
   // Add a 'root partition' status in which to collect insert statistics
-  TInsertPartitionStatus root_status;
-  root_status.__set_num_modified_rows(0L);
-  root_status.__set_stats(TInsertStats());
-  root_status.__set_id(-1L);
-  state->per_partition_status()->insert(make_pair(ROOT_PARTITION_KEY, root_status));
-
+  state->dml_exec_state()->AddPartition(ROOT_PARTITION_KEY, -1L, nullptr);
   return Status::OK();
 }
 
@@ -74,8 +69,8 @@ Status HBaseTableSink::Send(RuntimeState* state, RowBatch* batch) {
   RETURN_IF_ERROR(state->CheckQueryState());
   // Since everything is set up just forward everything to the writer.
   RETURN_IF_ERROR(hbase_table_writer_->AppendRows(batch));
-  (*state->per_partition_status())[ROOT_PARTITION_KEY].num_modified_rows +=
-      batch->num_rows();
+  state->dml_exec_state()->UpdatePartition(
+      ROOT_PARTITION_KEY, batch->num_rows(), nullptr);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e882cbb9/be/src/exec/hdfs-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index c12b711..ca08bac 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -403,8 +403,9 @@ Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state,
   COUNTER_ADD(files_created_counter_, 1);
 
   if (!ShouldSkipStaging(state, output_partition)) {
-    // Save the ultimate destination for this file (it will be moved by the coordinator)
-    (*state->hdfs_files_to_move())[output_partition->current_file_name] = final_location;
+    // Save the ultimate destination for this file (it will be moved by the coordinator).
+    state->dml_exec_state()->AddFileToMove(
+        output_partition->current_file_name, final_location);
   }
 
   ++output_partition->num_files;
@@ -573,21 +574,15 @@ inline Status HdfsTableSink::GetOutputPartition(RuntimeState* state, const Tuple
       return status;
     }
 
-    // Save the partition name so that the coordinator can create the partition directory
-    // structure if needed
-    DCHECK(state->per_partition_status()->find(partition->partition_name) ==
-        state->per_partition_status()->end());
-    TInsertPartitionStatus partition_status;
-    partition_status.__set_num_modified_rows(0L);
-    partition_status.__set_id(partition_descriptor->id());
-    partition_status.__set_stats(TInsertStats());
-    partition_status.__set_partition_base_dir(table_desc_->hdfs_base_dir());
-    state->per_partition_status()->insert(
-        make_pair(partition->partition_name, partition_status));
+    // Save the partition name so that the coordinator can create the partition
+    // directory structure if needed.
+    state->dml_exec_state()->AddPartition(
+        partition->partition_name, partition_descriptor->id(),
+        &table_desc_->hdfs_base_dir());
 
     if (!no_more_rows && !ShouldSkipStaging(state, partition.get())) {
-      // Indicate that temporary directory is to be deleted after execution
-      (*state->hdfs_files_to_move())[partition->tmp_hdfs_dir_name] = "";
+      // Indicate that temporary directory is to be deleted after execution.
+      state->dml_exec_state()->AddFileToMove(partition->tmp_hdfs_dir_name, "");
     }
 
     partition_keys_to_output_partitions_[key].first = std::move(partition);
@@ -643,17 +638,8 @@ Status HdfsTableSink::FinalizePartitionFile(RuntimeState* state,
   // OutputPartition writer could be nullptr if there is no row to output.
   if (partition->writer.get() != nullptr) {
     RETURN_IF_ERROR(partition->writer->Finalize());
-
-    // Track total number of appended rows per partition in runtime
-    // state. partition->num_rows counts number of rows appended is per-file.
-    PartitionStatusMap::iterator it =
-        state->per_partition_status()->find(partition->partition_name);
-
-    // Should have been created in GetOutputPartition() when the partition was
-    // initialised.
-    DCHECK(it != state->per_partition_status()->end());
-    it->second.num_modified_rows += partition->num_rows;
-    DataSink::MergeDmlStats(partition->writer->stats(), &it->second.stats);
+    state->dml_exec_state()->UpdatePartition(
+        partition->partition_name, partition->num_rows, &partition->writer->stats());
   }
 
   RETURN_IF_ERROR(ClosePartitionFile(state, partition));

http://git-wip-us.apache.org/repos/asf/impala/blob/e882cbb9/be/src/exec/kudu-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index 05f1d06..67bb86e 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -61,9 +61,6 @@ using kudu::client::KuduError;
 
 namespace impala {
 
-const static string& ROOT_PARTITION_KEY =
-    g_ImpalaInternalService_constants.ROOT_PARTITION_KEY;
-
 // Send 7MB buffers to Kudu, matching a hard-coded size in Kudu (KUDU-1693).
 const static int INDIVIDUAL_BUFFER_SIZE = 7 * 1024 * 1024;
 
@@ -92,15 +89,7 @@ Status KuduTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracke
       << "TableDescriptor must be an instance KuduTableDescriptor.";
   table_desc_ = static_cast<const KuduTableDescriptor*>(table_desc);
 
-  // Add a 'root partition' status in which to collect write statistics
-  TInsertPartitionStatus root_status;
-  root_status.__set_num_modified_rows(0L);
-  root_status.__set_id(-1L);
-  TKuduDmlStats kudu_dml_stats;
-  kudu_dml_stats.__set_num_row_errors(0L);
-  root_status.__set_stats(TInsertStats());
-  root_status.stats.__set_kudu_stats(kudu_dml_stats);
-  state->per_partition_status()->insert(make_pair(ROOT_PARTITION_KEY, root_status));
+  state->dml_exec_state()->InitForKuduDml();
 
   // Add counters
   total_rows_ = ADD_COUNTER(profile(), "TotalNumRows", TUnit::UNIT);
@@ -338,12 +327,9 @@ Status KuduTableSink::FlushFinal(RuntimeState* state) {
     VLOG_RPC << "Ignoring Flush() error status: " << flush_status.ToString();
   }
   Status status = CheckForErrors(state);
-  TInsertPartitionStatus& insert_status =
-      (*state->per_partition_status())[ROOT_PARTITION_KEY];
-  insert_status.__set_num_modified_rows(
-      total_rows_->value() - num_row_errors_->value());
-  insert_status.stats.kudu_stats.__set_num_row_errors(num_row_errors_->value());
-  insert_status.__set_kudu_latest_observed_ts(client_->GetLatestObservedTimestamp());
+  state->dml_exec_state()->SetKuduDmlStats(
+      total_rows_->value() - num_row_errors_->value(), num_row_errors_->value(),
+      client_->GetLatestObservedTimestamp());
   return status;
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e882cbb9/be/src/exec/plan-root-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc
index 1cdc544..836a376 100644
--- a/be/src/exec/plan-root-sink.cc
+++ b/be/src/exec/plan-root-sink.cc
@@ -117,6 +117,7 @@ void PlanRootSink::Close(RuntimeState* state) {
   unique_lock<mutex> l(lock_);
   // No guarantee that FlushFinal() has been called, so need to mark sender_done_ here as
   // well.
+  // TODO: shouldn't this also set eos to true? do we need both eos and sender_done_?
   sender_done_ = true;
   consumer_cv_.NotifyAll();
   // Wait for consumer to be done, in case sender tries to tear-down this sink while the

http://git-wip-us.apache.org/repos/asf/impala/blob/e882cbb9/be/src/runtime/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 0d4b61c..89cbbb9 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -37,6 +37,7 @@ add_library(Runtime
   data-stream-sender.cc
   debug-options.cc
   descriptors.cc
+  dml-exec-state.cc
   exec-env.cc
   fragment-instance-state.cc
   hbase-table.cc

http://git-wip-us.apache.org/repos/asf/impala/blob/e882cbb9/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 914a3e4..e8db00e 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -43,10 +43,7 @@ Coordinator::BackendState::BackendState(
     const TUniqueId& query_id, int state_idx, TRuntimeFilterMode::type filter_mode)
   : query_id_(query_id),
     state_idx_(state_idx),
-    filter_mode_(filter_mode),
-    rpc_latency_(0),
-    rpc_sent_(false),
-    peak_consumption_(0L) {
+    filter_mode_(filter_mode) {
 }
 
 void Coordinator::BackendState::Init(
@@ -413,11 +410,7 @@ Coordinator::BackendState::InstanceStats::InstanceStats(
     const FInstanceExecParams& exec_params, FragmentStats* fragment_stats,
     ObjectPool* obj_pool)
   : exec_params_(exec_params),
-    profile_(nullptr),
-    done_(false),
-    profile_created_(false),
-    total_split_size_(0),
-    total_ranges_complete_(0) {
+    profile_(nullptr) {
   const string& profile_name = Substitute("Instance $0 (host=$1)",
       PrintId(exec_params.instance_id), lexical_cast<string>(exec_params.host));
   profile_ = RuntimeProfile::Create(obj_pool, profile_name);

http://git-wip-us.apache.org/repos/asf/impala/blob/e882cbb9/be/src/runtime/coordinator-backend-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index 0973ca3..d2f122c 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -161,24 +161,24 @@ class Coordinator::BackendState {
     int64_t last_report_time_ms_ = 0;
 
     /// owned by coordinator object pool provided in the c'tor, created in Update()
-    RuntimeProfile* profile_;
+    RuntimeProfile* profile_ = nullptr;
 
     /// true if the final report has been received for the fragment instance.
     /// Used to handle duplicate done ReportExecStatus RPC messages. Used only
     /// in ApplyExecStatusReport()
-    bool done_;
+    bool done_ = false;
 
     /// true after the first call to profile->Update()
-    bool profile_created_;
+    bool profile_created_ = false;
 
     /// cumulative size of all splits; set in c'tor
-    int64_t total_split_size_;
+    int64_t total_split_size_ = 0;
 
     /// wall clock timer for this instance
     MonotonicStopWatch stopwatch_;
 
     /// total scan ranges complete across all scan nodes
-    int64_t total_ranges_complete_;
+    int64_t total_ranges_complete_ = 0;
 
     /// SCAN_RANGES_COMPLETE_COUNTERs in profile_
     std::vector<RuntimeProfile::Counter*> scan_ranges_complete_counters_;
@@ -215,7 +215,7 @@ class Coordinator::BackendState {
   boost::mutex lock_;
 
   // number of in-flight instances
-  int num_remaining_instances_;
+  int num_remaining_instances_ = 0;
 
   /// If the status indicates an error status, execution has either been aborted by the
   /// executing impalad (which then reported the error) or cancellation has been
@@ -235,15 +235,15 @@ class Coordinator::BackendState {
   ErrorLogMap error_log_;
 
   /// Time, in ms, that it took to execute the ExecRemoteFragment() RPC.
-  int64_t rpc_latency_;
+  int64_t rpc_latency_ = 0;
 
   /// If true, ExecPlanFragment() rpc has been sent - even if it was not determined to be
   /// successful.
-  bool rpc_sent_;
+  bool rpc_sent_ = false;
 
   /// peak memory used for this query (value of that node's query memtracker's
   /// peak_consumption()
-  int64_t peak_consumption_;
+  int64_t peak_consumption_ = 0;
 
   /// Set in ApplyExecStatusReport(). Uses MonotonicMillis().
   int64_t last_report_time_ms_ = 0;

http://git-wip-us.apache.org/repos/asf/impala/blob/e882cbb9/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index a5d53b2..e6b3bca 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -63,10 +63,7 @@ using std::unique_ptr;
 DECLARE_int32(be_port);
 DECLARE_string(hostname);
 
-DEFINE_bool(insert_inherit_permissions, false, "If true, new directories created by "
-    "INSERTs will inherit the permissions of their parent directories");
-
-namespace impala {
+using namespace impala;
 
 // Maximum number of fragment instances that can publish each broadcast filter.
 static const int MAX_BROADCAST_FILTER_PRODUCERS = 3;
@@ -93,9 +90,6 @@ Status Coordinator::Exec() {
   const TQueryExecRequest& request = schedule_.request();
   DCHECK(request.plan_exec_info.size() > 0);
 
-  needs_finalization_ = request.__isset.finalize_params;
-  if (needs_finalization_) finalize_params_ = request.finalize_params;
-
   VLOG_QUERY << "Exec() query_id=" << schedule_.query_id()
              << " stmt=" << request.query_ctx.client_request.stmt;
   stmt_type_ = request.stmt_type;
@@ -489,291 +483,32 @@ Status Coordinator::UpdateStatus(const Status& status, const string& backend_hos
   return query_status_;
 }
 
-void Coordinator::PopulatePathPermissionCache(hdfsFS fs, const string& path_str,
-    PermissionCache* permissions_cache) {
-  // Find out if the path begins with a hdfs:// -style prefix, and remove it and the
-  // location (e.g. host:port) if so.
-  int scheme_end = path_str.find("://");
-  string stripped_str;
-  if (scheme_end != string::npos) {
-    // Skip past the subsequent location:port/ prefix.
-    stripped_str = path_str.substr(path_str.find("/", scheme_end + 3));
-  } else {
-    stripped_str = path_str;
-  }
-
-  // Get the list of path components, used to build all path prefixes.
-  vector<string> components;
-  split(components, stripped_str, is_any_of("/"));
-
-  // Build a set of all prefixes (including the complete string) of stripped_path. So
-  // /a/b/c/d leads to a vector of: /a, /a/b, /a/b/c, /a/b/c/d
-  vector<string> prefixes;
-  // Stores the current prefix
-  stringstream accumulator;
-  for (const string& component: components) {
-    if (component.empty()) continue;
-    accumulator << "/" << component;
-    prefixes.push_back(accumulator.str());
-  }
-
-  // Now for each prefix, stat() it to see if a) it exists and b) if so what its
-  // permissions are. When we meet a directory that doesn't exist, we record the fact that
-  // we need to create it, and the permissions of its parent dir to inherit.
-  //
-  // Every prefix is recorded in the PermissionCache so we don't do more than one stat()
-  // for each path. If we need to create the directory, we record it as the pair (true,
-  // perms) so that the caller can identify which directories need their permissions
-  // explicitly set.
-
-  // Set to the permission of the immediate parent (i.e. the permissions to inherit if the
-  // current dir doesn't exist).
-  short permissions = 0;
-  for (const string& path: prefixes) {
-    PermissionCache::const_iterator it = permissions_cache->find(path);
-    if (it == permissions_cache->end()) {
-      hdfsFileInfo* info = hdfsGetPathInfo(fs, path.c_str());
-      if (info != nullptr) {
-        // File exists, so fill the cache with its current permissions.
-        permissions_cache->insert(
-            make_pair(path, make_pair(false, info->mPermissions)));
-        permissions = info->mPermissions;
-        hdfsFreeFileInfo(info, 1);
-      } else {
-        // File doesn't exist, so we need to set its permissions to its immediate parent
-        // once it's been created.
-        permissions_cache->insert(make_pair(path, make_pair(true, permissions)));
-      }
-    } else {
-      permissions = it->second.second;
-    }
-  }
-}
-
-Status Coordinator::FinalizeSuccessfulInsert() {
-  PermissionCache permissions_cache;
-  HdfsFsCache::HdfsFsMap filesystem_connection_cache;
-  HdfsOperationSet partition_create_ops(&filesystem_connection_cache);
-
-  // INSERT finalization happens in the five following steps
-  // 1. If OVERWRITE, remove all the files in the target directory
-  // 2. Create all the necessary partition directories.
-  HdfsTableDescriptor* hdfs_table;
-  RETURN_IF_ERROR(DescriptorTbl::CreateHdfsTblDescriptor(query_ctx_.desc_tbl,
-      finalize_params_.table_id, obj_pool(), &hdfs_table));
-  DCHECK(hdfs_table != nullptr)
-      << "INSERT target table not known in descriptor table: "
-      << finalize_params_.table_id;
-
-  // Loop over all partitions that were updated by this insert, and create the set of
-  // filesystem operations required to create the correct partition structure on disk.
-  for (const PartitionStatusMap::value_type& partition: per_partition_status_) {
-    SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "Overwrite/PartitionCreationTimer",
-          "FinalizationTimer"));
-    // INSERT allows writes to tables that have partitions on multiple filesystems.
-    // So we need to open connections to different filesystems as necessary. We use a
-    // local connection cache and populate it with one connection per filesystem that the
-    // partitions are on.
-    hdfsFS partition_fs_connection;
-    RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
-      partition.second.partition_base_dir, &partition_fs_connection,
-          &filesystem_connection_cache));
-
-    // Look up the partition in the descriptor table.
-    stringstream part_path_ss;
-    if (partition.second.id == -1) {
-      // If this is a non-existant partition, use the default partition location of
-      // <base_dir>/part_key_1=val/part_key_2=val/...
-      part_path_ss << finalize_params_.hdfs_base_dir << "/" << partition.first;
-    } else {
-      HdfsPartitionDescriptor* part = hdfs_table->GetPartition(partition.second.id);
-      DCHECK(part != nullptr)
-          << "table_id=" << hdfs_table->id() << " partition_id=" << partition.second.id
-          << "\n" <<  PrintThrift(runtime_state()->instance_ctx());
-      part_path_ss << part->location();
-    }
-    const string& part_path = part_path_ss.str();
-    bool is_s3_path = IsS3APath(part_path.c_str());
-
-    // If this is an overwrite insert, we will need to delete any updated partitions
-    if (finalize_params_.is_overwrite) {
-      if (partition.first.empty()) {
-        // If the root directory is written to, then the table must not be partitioned
-        DCHECK(per_partition_status_.size() == 1);
-        // We need to be a little more careful, and only delete data files in the root
-        // because the tmp directories the sink(s) wrote are there also.
-        // So only delete files in the table directory - all files are treated as data
-        // files by Hive and Impala, but directories are ignored (and may legitimately
-        // be used to store permanent non-table data by other applications).
-        int num_files = 0;
-        // hfdsListDirectory() only sets errno if there is an error, but it doesn't set
-        // it to 0 if the call succeed. When there is no error, errno could be any
-        // value. So need to clear errno before calling it.
-        // Once HDFS-8407 is fixed, the errno reset won't be needed.
-        errno = 0;
-        hdfsFileInfo* existing_files =
-            hdfsListDirectory(partition_fs_connection, part_path.c_str(), &num_files);
-        if (existing_files == nullptr && errno == EAGAIN) {
-          errno = 0;
-          existing_files =
-              hdfsListDirectory(partition_fs_connection, part_path.c_str(), &num_files);
-        }
-        // hdfsListDirectory() returns nullptr not only when there is an error but also
-        // when the directory is empty(HDFS-8407). Need to check errno to make sure
-        // the call fails.
-        if (existing_files == nullptr && errno != 0) {
-          return Status(GetHdfsErrorMsg("Could not list directory: ", part_path));
-        }
-        for (int i = 0; i < num_files; ++i) {
-          const string filename = path(existing_files[i].mName).filename().string();
-          if (existing_files[i].mKind == kObjectKindFile && !IsHiddenFile(filename)) {
-            partition_create_ops.Add(DELETE, existing_files[i].mName);
-          }
-        }
-        hdfsFreeFileInfo(existing_files, num_files);
-      } else {
-        // This is a partition directory, not the root directory; we can delete
-        // recursively with abandon, after checking that it ever existed.
-        // TODO: There's a potential race here between checking for the directory
-        // and a third-party deleting it.
-        if (FLAGS_insert_inherit_permissions && !is_s3_path) {
-          // There is no directory structure in S3, so "inheriting" permissions is not
-          // possible.
-          // TODO: Try to mimic inheriting permissions for S3.
-          PopulatePathPermissionCache(
-              partition_fs_connection, part_path, &permissions_cache);
-        }
-        // S3 doesn't have a directory structure, so we technically wouldn't need to
-        // CREATE_DIR on S3. However, libhdfs always checks if a path exists before
-        // carrying out an operation on that path. So we still need to call CREATE_DIR
-        // before we access that path due to this limitation.
-        if (hdfsExists(partition_fs_connection, part_path.c_str()) != -1) {
-          partition_create_ops.Add(DELETE_THEN_CREATE, part_path);
-        } else {
-          // Otherwise just create the directory.
-          partition_create_ops.Add(CREATE_DIR, part_path);
-        }
-      }
-    } else if (!is_s3_path
-        || !query_ctx_.client_request.query_options.s3_skip_insert_staging) {
-      // If the S3_SKIP_INSERT_STAGING query option is set, then the partition directories
-      // would have already been created by the table sinks.
-      if (FLAGS_insert_inherit_permissions && !is_s3_path) {
-        PopulatePathPermissionCache(
-            partition_fs_connection, part_path, &permissions_cache);
-      }
-      if (hdfsExists(partition_fs_connection, part_path.c_str()) == -1) {
-        partition_create_ops.Add(CREATE_DIR, part_path);
-      }
-    }
-  }
-
-  // We're done with the HDFS descriptor - free up its resources.
-  hdfs_table->ReleaseResources();
-  hdfs_table = nullptr;
-
-  {
-    SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "Overwrite/PartitionCreationTimer",
-          "FinalizationTimer"));
-    if (!partition_create_ops.Execute(
-        ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
-      for (const HdfsOperationSet::Error& err: partition_create_ops.errors()) {
-        // It's ok to ignore errors creating the directories, since they may already
-        // exist. If there are permission errors, we'll run into them later.
-        if (err.first->op() != CREATE_DIR) {
-          return Status(Substitute(
-              "Error(s) deleting partition directories. First error (of $0) was: $1",
-              partition_create_ops.errors().size(), err.second));
-        }
-      }
-    }
-  }
-
-  // 3. Move all tmp files
-  HdfsOperationSet move_ops(&filesystem_connection_cache);
-  HdfsOperationSet dir_deletion_ops(&filesystem_connection_cache);
-
-  for (FileMoveMap::value_type& move: files_to_move_) {
-    // Empty destination means delete, so this is a directory. These get deleted in a
-    // separate pass to ensure that we have moved all the contents of the directory first.
-    if (move.second.empty()) {
-      VLOG_ROW << "Deleting file: " << move.first;
-      dir_deletion_ops.Add(DELETE, move.first);
-    } else {
-      VLOG_ROW << "Moving tmp file: " << move.first << " to " << move.second;
-      if (FilesystemsMatch(move.first.c_str(), move.second.c_str())) {
-        move_ops.Add(RENAME, move.first, move.second);
-      } else {
-        move_ops.Add(MOVE, move.first, move.second);
-      }
-    }
-  }
-
-  {
-    SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "FileMoveTimer", "FinalizationTimer"));
-    if (!move_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
-      stringstream ss;
-      ss << "Error(s) moving partition files. First error (of "
-         << move_ops.errors().size() << ") was: " << move_ops.errors()[0].second;
-      return Status(ss.str());
-    }
-  }
-
-  // 4. Delete temp directories
-  {
-    SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "FileDeletionTimer",
-         "FinalizationTimer"));
-    if (!dir_deletion_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
-      stringstream ss;
-      ss << "Error(s) deleting staging directories. First error (of "
-         << dir_deletion_ops.errors().size() << ") was: "
-         << dir_deletion_ops.errors()[0].second;
-      return Status(ss.str());
-    }
-  }
-
-  // 5. Optionally update the permissions of the created partition directories
-  // Do this last so that we don't make a dir unwritable before we write to it.
-  if (FLAGS_insert_inherit_permissions) {
-    HdfsOperationSet chmod_ops(&filesystem_connection_cache);
-    for (const PermissionCache::value_type& perm: permissions_cache) {
-      bool new_dir = perm.second.first;
-      if (new_dir) {
-        short permissions = perm.second.second;
-        VLOG_QUERY << "INSERT created new directory: " << perm.first
-                   << ", inherited permissions are: " << oct << permissions;
-        chmod_ops.Add(CHMOD, perm.first, permissions);
-      }
-    }
-    if (!chmod_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
-      stringstream ss;
-      ss << "Error(s) setting permissions on newly created partition directories. First"
-         << " error (of " << chmod_ops.errors().size() << ") was: "
-         << chmod_ops.errors()[0].second;
-      return Status(ss.str());
-    }
-  }
-
-  return Status::OK();
-}
-
-Status Coordinator::FinalizeQuery() {
+Status Coordinator::FinalizeHdfsInsert() {
   // All instances must have reported their final statuses before finalization, which is a
   // post-condition of Wait. If the query was not successful, still try to clean up the
   // staging directory.
   DCHECK(has_called_wait_);
-  DCHECK(needs_finalization_);
+  DCHECK(finalize_params() != nullptr);
 
   VLOG_QUERY << "Finalizing query: " << query_id();
   SCOPED_TIMER(finalization_timer_);
   Status return_status = GetStatus();
   if (return_status.ok()) {
-    return_status = FinalizeSuccessfulInsert();
+    HdfsTableDescriptor* hdfs_table;
+    RETURN_IF_ERROR(DescriptorTbl::CreateHdfsTblDescriptor(query_ctx().desc_tbl,
+            finalize_params()->table_id, obj_pool(), &hdfs_table));
+    DCHECK(hdfs_table != nullptr)
+        << "INSERT target table not known in descriptor table: "
+        << finalize_params()->table_id;
+    return_status = dml_exec_state_.FinalizeHdfsInsert(*finalize_params(),
+        query_ctx().client_request.query_options.s3_skip_insert_staging,
+        hdfs_table, query_profile_);
+    hdfs_table->ReleaseResources();
   }
 
   stringstream staging_dir;
-  DCHECK(finalize_params_.__isset.staging_dir);
-  staging_dir << finalize_params_.staging_dir << "/" << PrintId(query_id(),"_") << "/";
+  DCHECK(finalize_params()->__isset.staging_dir);
+  staging_dir << finalize_params()->staging_dir << "/" << PrintId(query_id(),"_") << "/";
 
   hdfsFS hdfs_conn;
   RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(staging_dir.str(), &hdfs_conn));
@@ -813,26 +548,26 @@ Status Coordinator::Wait() {
   }
 
   DCHECK_EQ(stmt_type_, TStmtType::DML);
-  // Query finalization can only happen when all backends have reported
-  // relevant state. They only have relevant state to report in the parallel
-  // INSERT case, otherwise all the relevant state is from the coordinator
-  // fragment which will be available after Open() returns.
-  // Ignore the returned status if finalization is required., since FinalizeQuery() will
-  // pick it up and needs to execute regardless.
+  // Query finalization can only happen when all backends have reported relevant
+  // state. They only have relevant state to report in the parallel INSERT case,
+  // otherwise all the relevant state is from the coordinator fragment which will be
+  // available after Open() returns.  Ignore the returned status if finalization is
+  // required., since FinalizeHdfsInsert() will pick it up and needs to execute
+  // regardless.
   Status status = WaitForBackendCompletion();
-  if (!needs_finalization_ && !status.ok()) return status;
+  if (finalize_params() == nullptr && !status.ok()) return status;
 
   // Execution of query fragments has finished. We don't need to hold onto query execution
   // resources while we finalize the query.
   ReleaseExecResources();
   // Query finalization is required only for HDFS table sinks
-  if (needs_finalization_) RETURN_IF_ERROR(FinalizeQuery());
+  if (finalize_params() != nullptr) RETURN_IF_ERROR(FinalizeHdfsInsert());
   // Release admission control resources after we'd done the potentially heavyweight
   // finalization.
   ReleaseAdmissionControlResources();
 
   query_profile_->AddInfoString(
-      "DML Stats", DataSink::OutputDmlStats(per_partition_status_, "\n"));
+      "DML Stats", dml_exec_state_.OutputPartitionStats("\n"));
   // For DML queries, when Wait is done, the query is complete.
   ComputeQuerySummary();
   return status;
@@ -929,7 +664,7 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& param
   // TODO: only do this when the sink is done; probably missing a done field
   // in TReportExecStatus for that
   if (params.__isset.insert_exec_status) {
-    UpdateInsertExecStatus(params.insert_exec_status);
+    dml_exec_state_.Update(params.insert_exec_status);
   }
 
   if (backend_state->ApplyExecStatusReport(params, &exec_summary_, &progress_)) {
@@ -971,52 +706,10 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& param
   return Status::OK();
 }
 
-void Coordinator::UpdateInsertExecStatus(const TInsertExecStatus& insert_exec_status) {
-  lock_guard<mutex> l(lock_);
-  for (const PartitionStatusMap::value_type& partition:
-       insert_exec_status.per_partition_status) {
-    TInsertPartitionStatus* status = &(per_partition_status_[partition.first]);
-    status->__set_num_modified_rows(
-        status->num_modified_rows + partition.second.num_modified_rows);
-    status->__set_kudu_latest_observed_ts(std::max(
-        partition.second.kudu_latest_observed_ts, status->kudu_latest_observed_ts));
-    status->__set_id(partition.second.id);
-    status->__set_partition_base_dir(partition.second.partition_base_dir);
-
-    if (partition.second.__isset.stats) {
-      if (!status->__isset.stats) status->__set_stats(TInsertStats());
-      DataSink::MergeDmlStats(partition.second.stats, &status->stats);
-    }
-  }
-  files_to_move_.insert(
-      insert_exec_status.files_to_move.begin(), insert_exec_status.files_to_move.end());
-}
-
-
-uint64_t Coordinator::GetLatestKuduInsertTimestamp() const {
-  uint64_t max_ts = 0;
-  for (const auto& entry : per_partition_status_) {
-    max_ts = std::max(max_ts,
-        static_cast<uint64_t>(entry.second.kudu_latest_observed_ts));
-  }
-  return max_ts;
-}
-
 RuntimeState* Coordinator::runtime_state() {
   return coord_instance_ == nullptr ? nullptr : coord_instance_->runtime_state();
 }
 
-bool Coordinator::PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update) {
-  // Assume we are called only after all fragments have completed
-  DCHECK(has_called_wait_);
-
-  for (const PartitionStatusMap::value_type& partition: per_partition_status_) {
-    catalog_update->created_partitions.insert(partition.first);
-  }
-
-  return catalog_update->created_partitions.size() != 0;
-}
-
 // TODO: add histogram/percentile
 void Coordinator::ComputeQuerySummary() {
   // In this case, the query did not even get to start all fragment instances.
@@ -1285,4 +978,8 @@ void Coordinator::FInstanceStatsToJson(Document* doc) {
   }
   doc->AddMember("backend_instances", states, doc->GetAllocator());
 }
+
+const TFinalizeParams* Coordinator::finalize_params() const {
+  return schedule_.request().__isset.finalize_params
+      ? &schedule_.request().finalize_params : nullptr;
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/e882cbb9/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index d630b9a..6665c08 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -38,7 +38,8 @@
 #include "common/status.h"
 #include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/Types_types.h"
-#include "runtime/runtime-state.h" // for PartitionStatusMap; TODO: disentangle
+#include "runtime/dml-exec-state.h"
+#include "runtime/query-state.h"
 #include "scheduling/query-schedule.h"
 #include "util/condition-variable.h"
 #include "util/progress-updater.h"
@@ -130,9 +131,9 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// Idempotent.
   void Cancel(const Status* cause = nullptr);
 
-  /// Updates execution status of a particular backend as well as Insert-related
-  /// status (per_partition_status_ and files_to_move_). Also updates
-  /// num_remaining_backends_ and cancels execution if the backend has an error status.
+  /// Updates execution status of a particular backend as well as dml_exec_state_.
+  /// Also updates num_remaining_backends_ and cancels execution if the backend has an
+  /// error status.
   Status UpdateBackendExecStatus(const TReportExecStatusParams& params)
       WARN_UNUSED_RESULT;
 
@@ -149,17 +150,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   MemTracker* query_mem_tracker() const;
 
   /// This is safe to call only after Wait()
-  const PartitionStatusMap& per_partition_status() { return per_partition_status_; }
-
-  /// Returns the latest Kudu timestamp observed across any backends where DML into Kudu
-  /// was executed, or 0 if there were no Kudu timestamps reported.
-  /// This should only be called after Wait().
-  uint64_t GetLatestKuduInsertTimestamp() const;
-
-  /// Gathers all updates to the catalog required once this query has completed execution.
-  /// Returns true if a catalog update is required, false otherwise.
-  /// Must only be called after Wait()
-  bool PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update);
+  DmlExecState* dml_exec_state() { return &dml_exec_state_; }
 
   /// Return error log for coord and all the fragments. The error messages from the
   /// individual fragment instances are merged into a single output to retain readability.
@@ -229,12 +220,6 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// GetNext() hits eos.
   PlanRootSink* coord_sink_ = nullptr;
 
-  /// True if the query needs a post-execution step to tidy up
-  bool needs_finalization_ = false;
-
-  /// Only valid if needs_finalization is true
-  TFinalizeParams finalize_params_;
-
   /// ensures single-threaded execution of Wait(); must not hold lock_ when acquiring this
   boost::mutex wait_lock_;
 
@@ -275,6 +260,11 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
 
   ExecSummary exec_summary_;
 
+  /// Filled in as the query completes and tracks the results of DML queries.  This is
+  /// either the union of the reports from all fragment instances, or taken from the
+  /// coordinator fragment: only one of the two can legitimately produce updates.
+  DmlExecState dml_exec_state_;
+
   /// Aggregate counters for the entire query. Lives in 'obj_pool_'.
   RuntimeProfile* query_profile_ = nullptr;
 
@@ -308,21 +298,6 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// hits 0, any Wait()'ing thread is notified
   int num_remaining_backends_ = 0;
 
-  /// The following two structures, partition_row_counts_ and files_to_move_ are filled in
-  /// as the query completes, and track the results of INSERT queries that alter the
-  /// structure of tables. They are either the union of the reports from all fragment
-  /// instances, or taken from the coordinator fragment: only one of the two can
-  /// legitimately produce updates.
-
-  /// The set of partitions that have been written to or updated by all fragment
-  /// instances, along with statistics such as the number of rows written (may be 0). For
-  /// unpartitioned tables, the empty string denotes the entire table.
-  PartitionStatusMap per_partition_status_;
-
-  /// The set of files to move after an INSERT query has run, in (src, dest) form. An
-  /// empty string for the destination means that a file is to be deleted.
-  FileMoveMap files_to_move_;
-
   /// Event timeline for this query. Not owned.
   RuntimeProfile::EventSequence* query_events_ = nullptr;
 
@@ -357,6 +332,12 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// Returns a local object pool.
   ObjectPool* obj_pool() { return obj_pool_.get(); }
 
+  /// Returns request's finalize params, or nullptr if not present. If not present, then
+  /// HDFS INSERT finalization is not required.
+  const TFinalizeParams* finalize_params() const;
+
+  const TQueryCtx& query_ctx() const { return schedule_.request().query_ctx; }
+
   /// Only valid *after* calling Exec(). Return nullptr if the running query does not
   /// produce any rows.
   RuntimeState* runtime_state();
@@ -381,9 +362,6 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   Status UpdateStatus(const Status& status, const std::string& backend_hostname,
       bool is_fragment_failure, const TUniqueId& failed_fragment) WARN_UNUSED_RESULT;
 
-  /// Update per_partition_status_ and files_to_move_.
-  void UpdateInsertExecStatus(const TInsertExecStatus& insert_exec_status);
-
   /// Returns only when either all execution backends have reported success or the query
   /// is in error. Returns the status of the query.
   /// It is safe to call this concurrently, but any calls must be made only after Exec().
@@ -403,29 +381,11 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// profiles must not be updated while this is running.
   void ComputeQuerySummary();
 
-  /// TODO: move the next 3 functions into a separate class
-
-  /// Determines what the permissions of directories created by INSERT statements should
-  /// be if permission inheritance is enabled. Populates a map from all prefixes of
-  /// path_str (including the full path itself) which is a path in Hdfs, to pairs
-  /// (does_not_exist, permissions), where does_not_exist is true if the path does not
-  /// exist in Hdfs. If does_not_exist is true, permissions is set to the permissions of
-  /// the most immediate ancestor of the path that does exist, i.e. the permissions that
-  /// the path should inherit when created. Otherwise permissions is set to the actual
-  /// permissions of the path. The PermissionCache argument is also used to cache the
-  /// output across repeated calls, to avoid repeatedly calling hdfsGetPathInfo() on the
-  /// same path.
-  typedef boost::unordered_map<std::string, std::pair<bool, short>> PermissionCache;
-  void PopulatePathPermissionCache(hdfsFS fs, const std::string& path_str,
-      PermissionCache* permissions_cache);
-
-  /// Moves all temporary staging files to their final destinations.
-  Status FinalizeSuccessfulInsert() WARN_UNUSED_RESULT;
-
-  /// Perform any post-query cleanup required. Called by Wait() only after all fragment
-  /// instances have returned, or if the query has failed, in which case it only cleans up
-  /// temporary data rather than finishing the INSERT in flight.
-  Status FinalizeQuery() WARN_UNUSED_RESULT;
+  /// Perform any post-query cleanup required for HDFS (or other Hadoop FileSystem)
+  /// INSERT. Called by Wait() only after all fragment instances have returned, or if
+  /// the query has failed, in which case it only cleans up temporary data rather than
+  /// finishing the INSERT in flight.
+  Status FinalizeHdfsInsert() WARN_UNUSED_RESULT;
 
   /// Populates backend_states_, starts query execution at all backends in parallel, and
   /// blocks until startup completes.

http://git-wip-us.apache.org/repos/asf/impala/blob/e882cbb9/be/src/runtime/dml-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/dml-exec-state.cc b/be/src/runtime/dml-exec-state.cc
new file mode 100644
index 0000000..6853da5
--- /dev/null
+++ b/be/src/runtime/dml-exec-state.cc
@@ -0,0 +1,494 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/dml-exec-state.h"
+
+#include <boost/thread/locks.hpp>
+#include <boost/thread/lock_guard.hpp>
+#include <boost/algorithm/string.hpp>
+#include <boost/algorithm/string/join.hpp>
+#include <boost/filesystem.hpp>
+#include <gutil/strings/substitute.h>
+
+#include "common/logging.h"
+#include "util/pretty-printer.h"
+#include "util/container-util.h"
+#include "util/hdfs-bulk-ops.h"
+#include "util/hdfs-util.h"
+#include "util/runtime-profile-counters.h"
+#include "runtime/descriptors.h"
+#include "runtime/hdfs-fs-cache.h"
+#include "runtime/exec-env.h"
+#include "gen-cpp/ImpalaService_types.h"
+#include "gen-cpp/ImpalaInternalService_constants.h"
+#include "gen-cpp/ImpalaInternalService_types.h"
+#include "gen-cpp/Frontend_types.h"
+
+#include "common/names.h"
+
+DEFINE_bool(insert_inherit_permissions, false, "If true, new directories created by "
+    "INSERTs will inherit the permissions of their parent directories");
+
+using namespace impala;
+using boost::algorithm::is_any_of;
+using boost::algorithm::split;
+
+string DmlExecState::OutputPartitionStats(const string& prefix) {
+  lock_guard<mutex> l(lock_);
+  const char* indent = "  ";
+  stringstream ss;
+  ss << prefix;
+  bool first = true;
+  for (const PartitionStatusMap::value_type& val: per_partition_status_) {
+    if (!first) ss << endl;
+    first = false;
+    ss << "Partition: ";
+    const string& partition_key = val.first;
+    if (partition_key == g_ImpalaInternalService_constants.ROOT_PARTITION_KEY) {
+      ss << "Default" << endl;
+    } else {
+      ss << partition_key << endl;
+    }
+    if (val.second.__isset.num_modified_rows) {
+      ss << "NumModifiedRows: " << val.second.num_modified_rows << endl;
+    }
+
+    if (!val.second.__isset.stats) continue;
+    const TInsertStats& stats = val.second.stats;
+    if (stats.__isset.kudu_stats) {
+      ss << "NumRowErrors: " << stats.kudu_stats.num_row_errors << endl;
+    }
+
+    ss << indent << "BytesWritten: "
+       << PrettyPrinter::Print(stats.bytes_written, TUnit::BYTES);
+    if (stats.__isset.parquet_stats) {
+      const TParquetInsertStats& parquet_stats = stats.parquet_stats;
+      ss << endl << indent << "Per Column Sizes:";
+      for (map<string, int64_t>::const_iterator i = parquet_stats.per_column_size.begin();
+           i != parquet_stats.per_column_size.end(); ++i) {
+        ss << endl << indent << indent << i->first << ": "
+           << PrettyPrinter::Print(i->second, TUnit::BYTES);
+      }
+    }
+  }
+  return ss.str();
+}
+
+void DmlExecState::Update(const TInsertExecStatus& dml_exec_status) {
+  lock_guard<mutex> l(lock_);
+  for (const PartitionStatusMap::value_type& partition:
+           dml_exec_status.per_partition_status) {
+    TInsertPartitionStatus* status = &(per_partition_status_[partition.first]);
+    status->__set_num_modified_rows(
+        status->num_modified_rows + partition.second.num_modified_rows);
+    status->__set_kudu_latest_observed_ts(max<uint64_t>(
+            partition.second.kudu_latest_observed_ts, status->kudu_latest_observed_ts));
+    status->__set_id(partition.second.id);
+    status->__set_partition_base_dir(partition.second.partition_base_dir);
+
+    if (partition.second.__isset.stats) {
+      if (!status->__isset.stats) status->__set_stats(TInsertStats());
+      MergeDmlStats(partition.second.stats, &status->stats);
+    }
+  }
+  files_to_move_.insert(
+      dml_exec_status.files_to_move.begin(), dml_exec_status.files_to_move.end());
+}
+
+void DmlExecState::AddFileToMove(const string& file_name, const string& location) {
+  lock_guard<mutex> l(lock_);
+  files_to_move_[file_name] = location;
+}
+
+uint64_t DmlExecState::GetKuduLatestObservedTimestamp() {
+  lock_guard<mutex> l(lock_);
+  uint64_t max_ts = 0;
+  for (const auto& entry : per_partition_status_) {
+    max_ts = max<uint64_t>(max_ts, entry.second.kudu_latest_observed_ts);
+  }
+  return max_ts;
+}
+
+int64_t DmlExecState::GetNumModifiedRows() {
+  lock_guard<mutex> l(lock_);
+  int64_t result = 0;
+  for (const PartitionStatusMap::value_type& p: per_partition_status_) {
+    result += p.second.num_modified_rows;
+  }
+  return result;
+}
+
+bool DmlExecState::PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update) {
+  lock_guard<mutex> l(lock_);
+  for (const PartitionStatusMap::value_type& partition: per_partition_status_) {
+    catalog_update->created_partitions.insert(partition.first);
+  }
+  return catalog_update->created_partitions.size() != 0;
+}
+
+Status DmlExecState::FinalizeHdfsInsert(const TFinalizeParams& params,
+    bool s3_skip_insert_staging, HdfsTableDescriptor* hdfs_table,
+    RuntimeProfile* profile) {
+  lock_guard<mutex> l(lock_);
+  PermissionCache permissions_cache;
+  HdfsFsCache::HdfsFsMap filesystem_connection_cache;
+  HdfsOperationSet partition_create_ops(&filesystem_connection_cache);
+
+  // INSERT finalization happens in the five following steps
+  // 1. If OVERWRITE, remove all the files in the target directory
+  // 2. Create all the necessary partition directories.
+
+  // Loop over all partitions that were updated by this insert, and create the set of
+  // filesystem operations required to create the correct partition structure on disk.
+  for (const PartitionStatusMap::value_type& partition: per_partition_status_) {
+    SCOPED_TIMER(ADD_CHILD_TIMER(profile, "Overwrite/PartitionCreationTimer",
+            "FinalizationTimer"));
+    // INSERT allows writes to tables that have partitions on multiple filesystems.
+    // So we need to open connections to different filesystems as necessary. We use a
+    // local connection cache and populate it with one connection per filesystem that the
+    // partitions are on.
+    hdfsFS partition_fs_connection;
+    RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
+            partition.second.partition_base_dir, &partition_fs_connection,
+            &filesystem_connection_cache));
+
+    // Look up the partition in the descriptor table.
+    stringstream part_path_ss;
+    if (partition.second.id == -1) {
+      // If this is a non-existant partition, use the default partition location of
+      // <base_dir>/part_key_1=val/part_key_2=val/...
+      part_path_ss << params.hdfs_base_dir << "/" << partition.first;
+    } else {
+      HdfsPartitionDescriptor* part = hdfs_table->GetPartition(partition.second.id);
+      DCHECK(part != nullptr)
+          << "table_id=" << hdfs_table->id() << " partition_id=" << partition.second.id;
+      part_path_ss << part->location();
+    }
+    const string& part_path = part_path_ss.str();
+    bool is_s3_path = IsS3APath(part_path.c_str());
+
+    // If this is an overwrite insert, we will need to delete any updated partitions
+    if (params.is_overwrite) {
+      if (partition.first.empty()) {
+        // If the root directory is written to, then the table must not be partitioned
+        DCHECK(per_partition_status_.size() == 1);
+        // We need to be a little more careful, and only delete data files in the root
+        // because the tmp directories the sink(s) wrote are there also.
+        // So only delete files in the table directory - all files are treated as data
+        // files by Hive and Impala, but directories are ignored (and may legitimately
+        // be used to store permanent non-table data by other applications).
+        int num_files = 0;
+        // hfdsListDirectory() only sets errno if there is an error, but it doesn't set
+        // it to 0 if the call succeed. When there is no error, errno could be any
+        // value. So need to clear errno before calling it.
+        // Once HDFS-8407 is fixed, the errno reset won't be needed.
+        errno = 0;
+        hdfsFileInfo* existing_files =
+            hdfsListDirectory(partition_fs_connection, part_path.c_str(), &num_files);
+        if (existing_files == nullptr && errno == EAGAIN) {
+          errno = 0;
+          existing_files =
+              hdfsListDirectory(partition_fs_connection, part_path.c_str(), &num_files);
+        }
+        // hdfsListDirectory() returns nullptr not only when there is an error but also
+        // when the directory is empty(HDFS-8407). Need to check errno to make sure
+        // the call fails.
+        if (existing_files == nullptr && errno != 0) {
+          return Status(GetHdfsErrorMsg("Could not list directory: ", part_path));
+        }
+        for (int i = 0; i < num_files; ++i) {
+          const string filename =
+              boost::filesystem::path(existing_files[i].mName).filename().string();
+          if (existing_files[i].mKind == kObjectKindFile && !IsHiddenFile(filename)) {
+            partition_create_ops.Add(DELETE, existing_files[i].mName);
+          }
+        }
+        hdfsFreeFileInfo(existing_files, num_files);
+      } else {
+        // This is a partition directory, not the root directory; we can delete
+        // recursively with abandon, after checking that it ever existed.
+        // TODO: There's a potential race here between checking for the directory
+        // and a third-party deleting it.
+        if (FLAGS_insert_inherit_permissions && !is_s3_path) {
+          // There is no directory structure in S3, so "inheriting" permissions is not
+          // possible.
+          // TODO: Try to mimic inheriting permissions for S3.
+          PopulatePathPermissionCache(
+              partition_fs_connection, part_path, &permissions_cache);
+        }
+        // S3 doesn't have a directory structure, so we technically wouldn't need to
+        // CREATE_DIR on S3. However, libhdfs always checks if a path exists before
+        // carrying out an operation on that path. So we still need to call CREATE_DIR
+        // before we access that path due to this limitation.
+        if (hdfsExists(partition_fs_connection, part_path.c_str()) != -1) {
+          partition_create_ops.Add(DELETE_THEN_CREATE, part_path);
+        } else {
+          // Otherwise just create the directory.
+          partition_create_ops.Add(CREATE_DIR, part_path);
+        }
+      }
+    } else if (!is_s3_path || !s3_skip_insert_staging) {
+      // If the S3_SKIP_INSERT_STAGING query option is set, then the partition directories
+      // would have already been created by the table sinks.
+      if (FLAGS_insert_inherit_permissions && !is_s3_path) {
+        PopulatePathPermissionCache(
+            partition_fs_connection, part_path, &permissions_cache);
+      }
+      if (hdfsExists(partition_fs_connection, part_path.c_str()) == -1) {
+        partition_create_ops.Add(CREATE_DIR, part_path);
+      }
+    }
+  }
+
+  {
+    SCOPED_TIMER(ADD_CHILD_TIMER(profile, "Overwrite/PartitionCreationTimer",
+            "FinalizationTimer"));
+    if (!partition_create_ops.Execute(
+            ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
+      for (const HdfsOperationSet::Error& err: partition_create_ops.errors()) {
+        // It's ok to ignore errors creating the directories, since they may already
+        // exist. If there are permission errors, we'll run into them later.
+        if (err.first->op() != CREATE_DIR) {
+          return Status(Substitute(
+                  "Error(s) deleting partition directories. First error (of $0) was: $1",
+                  partition_create_ops.errors().size(), err.second));
+        }
+      }
+    }
+  }
+
+  // 3. Move all tmp files
+  HdfsOperationSet move_ops(&filesystem_connection_cache);
+  HdfsOperationSet dir_deletion_ops(&filesystem_connection_cache);
+
+  for (FileMoveMap::value_type& move: files_to_move_) {
+    // Empty destination means delete, so this is a directory. These get deleted in a
+    // separate pass to ensure that we have moved all the contents of the directory first.
+    if (move.second.empty()) {
+      VLOG_ROW << "Deleting file: " << move.first;
+      dir_deletion_ops.Add(DELETE, move.first);
+    } else {
+      VLOG_ROW << "Moving tmp file: " << move.first << " to " << move.second;
+      if (FilesystemsMatch(move.first.c_str(), move.second.c_str())) {
+        move_ops.Add(RENAME, move.first, move.second);
+      } else {
+        move_ops.Add(MOVE, move.first, move.second);
+      }
+    }
+  }
+
+  {
+    SCOPED_TIMER(ADD_CHILD_TIMER(profile, "FileMoveTimer", "FinalizationTimer"));
+    if (!move_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
+      stringstream ss;
+      ss << "Error(s) moving partition files. First error (of "
+         << move_ops.errors().size() << ") was: " << move_ops.errors()[0].second;
+      return Status(ss.str());
+    }
+  }
+
+  // 4. Delete temp directories
+  {
+    SCOPED_TIMER(ADD_CHILD_TIMER(profile, "FileDeletionTimer", "FinalizationTimer"));
+    if (!dir_deletion_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
+      stringstream ss;
+      ss << "Error(s) deleting staging directories. First error (of "
+         << dir_deletion_ops.errors().size() << ") was: "
+         << dir_deletion_ops.errors()[0].second;
+      return Status(ss.str());
+    }
+  }
+
+  // 5. Optionally update the permissions of the created partition directories
+  // Do this last so that we don't make a dir unwritable before we write to it.
+  if (FLAGS_insert_inherit_permissions) {
+    HdfsOperationSet chmod_ops(&filesystem_connection_cache);
+    for (const PermissionCache::value_type& perm: permissions_cache) {
+      bool new_dir = perm.second.first;
+      if (new_dir) {
+        short permissions = perm.second.second;
+        VLOG_QUERY << "INSERT created new directory: " << perm.first
+                   << ", inherited permissions are: " << oct << permissions;
+        chmod_ops.Add(CHMOD, perm.first, permissions);
+      }
+    }
+    if (!chmod_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
+      stringstream ss;
+      ss << "Error(s) setting permissions on newly created partition directories. First"
+         << " error (of " << chmod_ops.errors().size() << ") was: "
+         << chmod_ops.errors()[0].second;
+      return Status(ss.str());
+    }
+  }
+  return Status::OK();
+}
+
+void DmlExecState::PopulatePathPermissionCache(hdfsFS fs, const string& path_str,
+    PermissionCache* permissions_cache) {
+  // Find out if the path begins with a hdfs:// -style prefix, and remove it and the
+  // location (e.g. host:port) if so.
+  int scheme_end = path_str.find("://");
+  string stripped_str;
+  if (scheme_end != string::npos) {
+    // Skip past the subsequent location:port/ prefix.
+    stripped_str = path_str.substr(path_str.find("/", scheme_end + 3));
+  } else {
+    stripped_str = path_str;
+  }
+
+  // Get the list of path components, used to build all path prefixes.
+  vector<string> components;
+  split(components, stripped_str, is_any_of("/"));
+
+  // Build a set of all prefixes (including the complete string) of stripped_path. So
+  // /a/b/c/d leads to a vector of: /a, /a/b, /a/b/c, /a/b/c/d
+  vector<string> prefixes;
+  // Stores the current prefix
+  stringstream accumulator;
+  for (const string& component: components) {
+    if (component.empty()) continue;
+    accumulator << "/" << component;
+    prefixes.push_back(accumulator.str());
+  }
+
+  // Now for each prefix, stat() it to see if a) it exists and b) if so what its
+  // permissions are. When we meet a directory that doesn't exist, we record the fact that
+  // we need to create it, and the permissions of its parent dir to inherit.
+  //
+  // Every prefix is recorded in the PermissionCache so we don't do more than one stat()
+  // for each path. If we need to create the directory, we record it as the pair (true,
+  // perms) so that the caller can identify which directories need their permissions
+  // explicitly set.
+
+  // Set to the permission of the immediate parent (i.e. the permissions to inherit if the
+  // current dir doesn't exist).
+  short permissions = 0;
+  for (const string& path: prefixes) {
+    PermissionCache::const_iterator it = permissions_cache->find(path);
+    if (it == permissions_cache->end()) {
+      hdfsFileInfo* info = hdfsGetPathInfo(fs, path.c_str());
+      if (info != nullptr) {
+        // File exists, so fill the cache with its current permissions.
+        permissions_cache->insert(
+            make_pair(path, make_pair(false, info->mPermissions)));
+        permissions = info->mPermissions;
+        hdfsFreeFileInfo(info, 1);
+      } else {
+        // File doesn't exist, so we need to set its permissions to its immediate parent
+        // once it's been created.
+        permissions_cache->insert(make_pair(path, make_pair(true, permissions)));
+      }
+    } else {
+      permissions = it->second.second;
+    }
+  }
+}
+
+bool DmlExecState::ToThrift(TInsertExecStatus* dml_status) {
+  lock_guard<mutex> l(lock_);
+  bool set_thrift = false;
+  if (files_to_move_.size() > 0) {
+    dml_status->__set_files_to_move(files_to_move_);
+    set_thrift = true;
+  }
+  if (per_partition_status_.size() > 0) {
+    dml_status->__set_per_partition_status(per_partition_status_);
+    set_thrift = true;
+  }
+  return set_thrift;
+}
+
+void DmlExecState::ToTInsertResult(TInsertResult* insert_result) {
+  lock_guard<mutex> l(lock_);
+  int64_t num_row_errors = 0;
+  bool has_kudu_stats = false;
+  for (const PartitionStatusMap::value_type& v: per_partition_status_) {
+    insert_result->rows_modified[v.first] = v.second.num_modified_rows;
+    if (v.second.__isset.stats && v.second.stats.__isset.kudu_stats) {
+      has_kudu_stats = true;
+    }
+    num_row_errors += v.second.stats.kudu_stats.num_row_errors;
+  }
+  if (has_kudu_stats) insert_result->__set_num_row_errors(num_row_errors);
+}
+
+void DmlExecState::AddPartition(
+    const string& name, int64_t id, const string* base_dir) {
+  lock_guard<mutex> l(lock_);
+  DCHECK(per_partition_status_.find(name) == per_partition_status_.end());
+  TInsertPartitionStatus status;
+  status.__set_num_modified_rows(0L);
+  status.__set_id(id);
+  status.__isset.stats = true;
+  if (base_dir != nullptr) status.__set_partition_base_dir(*base_dir);
+  per_partition_status_.insert(make_pair(name, status));
+}
+
+void DmlExecState::UpdatePartition(const string& partition_name,
+    int64_t num_modified_rows_delta, const TInsertStats* insert_stats) {
+  lock_guard<mutex> l(lock_);
+  PartitionStatusMap::iterator entry = per_partition_status_.find(partition_name);
+  DCHECK(entry != per_partition_status_.end());
+  entry->second.num_modified_rows += num_modified_rows_delta;
+  if (insert_stats == nullptr) return;
+  MergeDmlStats(*insert_stats, &entry->second.stats);
+}
+
+void DmlExecState::MergeDmlStats(const TInsertStats& src, TInsertStats* dst) {
+  dst->bytes_written += src.bytes_written;
+  if (src.__isset.kudu_stats) {
+    dst->__isset.kudu_stats = true;
+    if (!dst->kudu_stats.__isset.num_row_errors) {
+      dst->kudu_stats.__set_num_row_errors(0);
+    }
+    dst->kudu_stats.__set_num_row_errors(
+        dst->kudu_stats.num_row_errors + src.kudu_stats.num_row_errors);
+  }
+  if (src.__isset.parquet_stats) {
+    if (dst->__isset.parquet_stats) {
+      MergeMapValues<string, int64_t>(src.parquet_stats.per_column_size,
+          &dst->parquet_stats.per_column_size);
+    } else {
+      dst->__set_parquet_stats(src.parquet_stats);
+    }
+  }
+}
+
+void DmlExecState::InitForKuduDml() {
+  // For Kudu, track only one set of DML stats, so use the ROOT_PARTITION_KEY.
+  const string& partition_name = g_ImpalaInternalService_constants.ROOT_PARTITION_KEY;
+  lock_guard<mutex> l(lock_);
+  DCHECK(per_partition_status_.find(partition_name) == per_partition_status_.end());
+  TInsertPartitionStatus status;
+  status.__set_num_modified_rows(0L);
+  status.__set_id(-1L);
+  status.__isset.stats = true;
+  status.stats.__isset.kudu_stats = true;
+  per_partition_status_.insert(make_pair(partition_name, status));
+}
+
+void DmlExecState::SetKuduDmlStats(int64_t num_modified_rows, int64_t num_row_errors,
+    int64_t latest_ts) {
+  // For Kudu, track only one set of DML stats, so use the ROOT_PARTITION_KEY.
+  const string& partition_name = g_ImpalaInternalService_constants.ROOT_PARTITION_KEY;
+  lock_guard<mutex> l(lock_);
+  PartitionStatusMap::iterator entry = per_partition_status_.find(partition_name);
+  DCHECK(entry != per_partition_status_.end());
+  entry->second.__set_num_modified_rows(num_modified_rows);
+  entry->second.stats.kudu_stats.__set_num_row_errors(num_row_errors);
+  entry->second.__set_kudu_latest_observed_ts(latest_ts);
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/e882cbb9/be/src/runtime/dml-exec-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/dml-exec-state.h b/be/src/runtime/dml-exec-state.h
new file mode 100644
index 0000000..728284a
--- /dev/null
+++ b/be/src/runtime/dml-exec-state.h
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef IMPALA_RUNTIME_DML_EXEC_STATE_H
+#define IMPALA_RUNTIME_DML_EXEC_STATE_H
+
+#include <string>
+#include <map>
+#include <boost/unordered_map.hpp>
+#include <boost/thread/mutex.hpp>
+
+#include "common/hdfs.h"
+#include "common/status.h"
+
+namespace impala {
+
+class TInsertExecStatus;
+class TInsertResult;
+class TInsertStats;
+class TFinalizeParams;
+class TUpdateCatalogRequest;
+class TInsertPartitionStatus;
+class RuntimeProfile;
+class HdfsTableDescriptor;
+
+/// DmlExecState manages the state related to the execution of a DML statement
+/// (creation of new files, new partitions, etc.).
+///
+/// During DML execution, the table sink adds per-partition status using AddPartition()
+/// and then UpdatePartition() for non-Kudu tables.  For Kudu tables, the sink adds DML
+/// stats using InitForKuduDml() followed by SetKuduDmlStats().  In the case of the
+/// HDFS sink, it will also record the collection of files that should be moved by the
+/// coordinator on finalization using AddFileToMove().
+///
+/// The state is then serialized to thrift and merged at the coordinator using
+/// Update().  The coordinator will then use OutputPartitionStats(),
+/// GetKuduLatestObservedTimestamp(), PrepareCatalogUpdate() and FinalizeHdfsInsert()
+/// to perform various finalization tasks.
+///
+
+/// Thread-safe.
+class DmlExecState {
+ public:
+  /// Merge values from 'dml_exec_status'.
+  void Update(const TInsertExecStatus& dml_exec_status);
+
+  /// Add a new partition with the given parameters. Ignores 'base_dir' if nullptr.
+  /// It is an error to call this for an existing partition.
+  void AddPartition(const std::string& name, int64_t id, const std::string* base_dir);
+
+  /// Merge given values into stats for partition with name 'partition_name'.
+  /// Ignores 'insert_stats' if nullptr.
+  /// Requires that the partition already exist.
+  void UpdatePartition(const std::string& partition_name,
+      int64_t num_modified_rows_delta, const TInsertStats* insert_stats);
+
+  /// Used to initialize this state when execute Kudu DML. Must be called before
+  /// SetKuduDmlStats().
+  void InitForKuduDml();
+
+  /// Update stats for a Kudu DML sink. Requires that InitForKuduDml() was already called.
+  void SetKuduDmlStats(int64_t num_modified_rows, int64_t num_row_errors,
+      int64_t latest_ts);
+
+  /// Adds new file/location to the move map.
+  void AddFileToMove(const std::string& file_name, const std::string& location);
+
+  /// Outputs the partition stats to a string.
+  std::string OutputPartitionStats(const std::string& prefix);
+
+  /// Returns the latest Kudu timestamp observed across any backends where DML into Kudu
+  /// was executed, or 0 if there were no Kudu timestamps reported.
+  uint64_t GetKuduLatestObservedTimestamp();
+
+  /// Return the total number of modified rows across all partitions.
+  int64_t GetNumModifiedRows();
+
+  /// Populates 'catalog_update' with PartitionStatusMap data.
+  /// Returns true if a catalog update is required, false otherwise.
+  bool PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update);
+
+  /// For HDFS (and other Hadoop FileSystem) INSERT, moves all temporary staging files
+  /// to their final destinations, as indicated by 'params', and creates new partitions
+  /// for 'hdfs_table' as required.  Adds child timers to profile for the various
+  /// stages of finalization.  If the table is on an S3 path and
+  /// 's3_skip_insert_staging' is true, does not create new partition directories.
+  Status FinalizeHdfsInsert(const TFinalizeParams& params, bool s3_skip_insert_staging,
+      HdfsTableDescriptor* hdfs_table, RuntimeProfile* profile) WARN_UNUSED_RESULT;
+
+  // Serialize to thrift. Returns true if any fields of 'dml_status' were set.
+  bool ToThrift(TInsertExecStatus* dml_status);
+
+  // Populates 'insert_result' with PartitionStatusMap data, for Impala's extension of
+  // Beeswax.
+  void ToTInsertResult(TInsertResult* insert_result);
+
+ private:
+  // protects all fields below
+  boost::mutex lock_;
+
+  /// Counts how many rows an DML query has added to a particular partition (partitions
+  /// are identified by their partition keys: k1=v1/k2=v2 etc. Unpartitioned tables
+  /// have a single 'default' partition which is identified by ROOT_PARTITION_KEY.
+  /// Uses ordered map so that iteration order is deterministic.
+  typedef std::map<std::string, TInsertPartitionStatus> PartitionStatusMap;
+  PartitionStatusMap per_partition_status_;
+
+  /// Tracks files to move from a temporary (key) to a final destination (value) as
+  /// part of query finalization. If the destination is empty, the file is to be
+  /// deleted.  Uses ordered map so that iteration order is deterministic.
+  typedef std::map<std::string, std::string> FileMoveMap;
+  FileMoveMap files_to_move_;
+
+  /// Determines what the permissions of directories created by INSERT statements should
+  /// be if permission inheritance is enabled. Populates a map from all prefixes of
+  /// 'path_str' (including the full path itself) which is a path in Hdfs, to pairs
+  /// (does_not_exist, permissions), where does_not_exist is true if the path does not
+  /// exist in Hdfs. If does_not_exist is true, permissions is set to the permissions of
+  /// the most immediate ancestor of the path that does exist, i.e. the permissions that
+  /// the path should inherit when created. Otherwise permissions is set to the actual
+  /// permissions of the path. The PermissionCache argument is also used to cache the
+  /// output across repeated calls, to avoid repeatedly calling hdfsGetPathInfo() on the
+  /// same path.
+  typedef boost::unordered_map<std::string, std::pair<bool, short>> PermissionCache;
+  void PopulatePathPermissionCache(hdfsFS fs, const std::string& path_str,
+      PermissionCache* permissions_cache);
+
+  /// Merge 'src' into 'dst'. Not thread-safe.
+  void MergeDmlStats(const TInsertStats& src, TInsertStats* dst);
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/e882cbb9/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 1cc646d..ad5748f 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -236,18 +236,9 @@ void QueryState::ReportExecStatusAux(bool done, const Status& status,
     // Only send updates to insert status if fragment is finished, the coordinator waits
     // until query execution is done to use them anyhow.
     RuntimeState* state = fis->runtime_state();
-    if (done && (state->hdfs_files_to_move()->size() > 0
-        || state->per_partition_status()->size() > 0)) {
-      TInsertExecStatus insert_status;
-      if (state->hdfs_files_to_move()->size() > 0) {
-        insert_status.__set_files_to_move(*state->hdfs_files_to_move());
-      }
-      if (state->per_partition_status()->size() > 0) {
-        insert_status.__set_per_partition_status(*state->per_partition_status());
-      }
-      params.__set_insert_exec_status(insert_status);
+    if (done && state->dml_exec_state()->ToThrift(&params.insert_exec_status)) {
+      params.__isset.insert_exec_status = true;
     }
-
     // Send new errors to coordinator
     state->GetUnreportedErrors(&params.error_log);
     params.__isset.error_log = (params.error_log.size() > 0);

http://git-wip-us.apache.org/repos/asf/impala/blob/e882cbb9/be/src/runtime/runtime-filter-bank.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc
index 239e066..4e23a42 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -27,6 +27,7 @@
 #include "runtime/mem-tracker.h"
 #include "runtime/query-state.h"
 #include "runtime/runtime-filter.inline.h"
+#include "runtime/runtime-state.h"
 #include "service/impala-server.h"
 #include "util/bit-util.h"
 #include "util/bloom-filter.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/e882cbb9/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 7edd718..5dc3a3f 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -27,6 +27,7 @@
 #include "common/global-types.h"  // for PlanNodeId
 #include "runtime/client-cache-types.h"
 #include "runtime/thread-resource-mgr.h"
+#include "runtime/dml-exec-state.h"
 #include "util/runtime-profile.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 
@@ -56,22 +57,6 @@ namespace io {
   class DiskIoMgr;
 }
 
-/// TODO: move the typedefs into a separate .h (and fix the includes for that)
-
-/// Counts how many rows an INSERT query has added to a particular partition
-/// (partitions are identified by their partition keys: k1=v1/k2=v2
-/// etc. Unpartitioned tables have a single 'default' partition which is
-/// identified by ROOT_PARTITION_KEY.
-typedef std::map<std::string, TInsertPartitionStatus> PartitionStatusMap;
-
-/// Stats per partition for insert queries. They key is the same as for PartitionRowCount
-typedef std::map<std::string, TInsertStats> PartitionInsertStats;
-
-/// Tracks files to move from a temporary (key) to a final destination (value) as
-/// part of query finalization. If the destination is empty, the file is to be
-/// deleted.
-typedef std::map<std::string, std::string> FileMoveMap;
-
 /// A collection of items that are part of the global state of a query and shared across
 /// all execution nodes of that query. After initialisation, callers must call
 /// ReleaseResources() to ensure that all resources are correctly freed before
@@ -136,8 +121,6 @@ class RuntimeState {
   }
   ThreadResourceMgr::ResourcePool* resource_pool() { return resource_pool_; }
 
-  FileMoveMap* hdfs_files_to_move() { return &hdfs_files_to_move_; }
-
   void set_fragment_root_id(PlanNodeId id) {
     DCHECK_EQ(root_node_id_, -1) << "Should not set this twice.";
     root_node_id_ = id;
@@ -149,7 +132,7 @@ class RuntimeState {
 
   RuntimeFilterBank* filter_bank() { return filter_bank_.get(); }
 
-  PartitionStatusMap* per_partition_status() { return &per_partition_status_; }
+  DmlExecState* dml_exec_state() { return &dml_exec_state_; }
 
   /// Returns runtime state profile
   RuntimeProfile* runtime_profile() { return profile_; }
@@ -344,12 +327,8 @@ class RuntimeState {
   /// state is responsible for returning this pool to the thread mgr.
   ThreadResourceMgr::ResourcePool* resource_pool_ = nullptr;
 
-  /// Temporary Hdfs files created, and where they should be moved to ultimately.
-  /// Mapping a filename to a blank destination causes it to be deleted.
-  FileMoveMap hdfs_files_to_move_;
-
-  /// Records summary statistics for the results of inserts into Hdfs partitions.
-  PartitionStatusMap per_partition_status_;
+  /// Execution state for DML statements.
+  DmlExecState dml_exec_state_;
 
   RuntimeProfile* const profile_;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e882cbb9/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 29c493e..e5d5e13 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -21,6 +21,7 @@
 #include <limits>
 #include <gutil/strings/substitute.h>
 
+#include "runtime/coordinator.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
@@ -569,7 +570,8 @@ void ClientRequestState::Done() {
   // must happen before taking lock_ below.
   if (coord_.get() != NULL) {
     // This is safe to access on coord_ after Wait() has been called.
-    uint64_t latest_kudu_ts = coord_->GetLatestKuduInsertTimestamp();
+    uint64_t latest_kudu_ts =
+        coord_->dml_exec_state()->GetKuduLatestObservedTimestamp();
     if (latest_kudu_ts > 0) {
       VLOG_RPC << "Updating session (id=" << session_id()  << ") with latest "
                << "observed Kudu timestamp: " << latest_kudu_ts;
@@ -917,7 +919,7 @@ Status ClientRequestState::UpdateCatalog() {
     catalog_update.__set_sync_ddl(exec_request().query_options.sync_ddl);
     catalog_update.__set_header(TCatalogServiceRequestHeader());
     catalog_update.header.__set_requesting_user(effective_user());
-    if (!coord()->PrepareCatalogUpdate(&catalog_update)) {
+    if (!coord()->dml_exec_state()->PrepareCatalogUpdate(&catalog_update)) {
       VLOG_QUERY << "No partitions altered, not updating metastore (query id: "
                  << query_id() << ")";
     } else {
@@ -1026,9 +1028,7 @@ void ClientRequestState::SetCreateTableAsSelectResultSet() {
   // operation.
   if (catalog_op_executor_->ddl_exec_response()->new_table_created) {
     DCHECK(coord_.get());
-    for (const PartitionStatusMap::value_type& p: coord_->per_partition_status()) {
-      total_num_rows_inserted += p.second.num_modified_rows;
-    }
+    total_num_rows_inserted = coord_->dml_exec_state()->GetNumModifiedRows();
   }
   const string& summary_msg = Substitute("Inserted $0 row(s)", total_num_rows_inserted);
   VLOG_QUERY << summary_msg;

http://git-wip-us.apache.org/repos/asf/impala/blob/e882cbb9/be/src/service/client-request-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index 0c05bc4..657f3de 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -24,6 +24,7 @@
 #include "scheduling/query-schedule.h"
 #include "service/child-query.h"
 #include "service/impala-server.h"
+#include "service/query-result-set.h"
 #include "util/auth-util.h"
 #include "util/condition-variable.h"
 #include "util/runtime-profile.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/e882cbb9/be/src/service/impala-beeswax-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index b827ff3..c441285 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -20,10 +20,12 @@
 #include "common/logging.h"
 #include "gen-cpp/Frontend_types.h"
 #include "rpc/thrift-util.h"
+#include "runtime/coordinator.h"
 #include "runtime/exec-env.h"
 #include "runtime/raw-value.inline.h"
 #include "runtime/timestamp-value.h"
 #include "service/client-request-state.h"
+#include "service/frontend.h"
 #include "service/query-options.h"
 #include "service/query-result-set.h"
 #include "util/auth-util.h"
@@ -563,22 +565,8 @@ Status ImpalaServer::CloseInsertInternal(const TUniqueId& query_id,
       // Note that when IMPALA-87 is fixed (INSERT without FROM clause) we might
       // need to revisit this, since that might lead us to insert a row without a
       // coordinator, depending on how we choose to drive the table sink.
-      int64_t num_row_errors = 0;
-      bool has_kudu_stats = false;
       if (request_state->coord() != nullptr) {
-        for (const PartitionStatusMap::value_type& v:
-             request_state->coord()->per_partition_status()) {
-          const pair<string, TInsertPartitionStatus> partition_status = v;
-          insert_result->rows_modified[partition_status.first] =
-              partition_status.second.num_modified_rows;
-
-          if (partition_status.second.__isset.stats &&
-              partition_status.second.stats.__isset.kudu_stats) {
-            has_kudu_stats = true;
-          }
-          num_row_errors += partition_status.second.stats.kudu_stats.num_row_errors;
-        }
-        if (has_kudu_stats) insert_result->__set_num_row_errors(num_row_errors);
+        request_state->coord()->dml_exec_state()->ToTInsertResult(insert_result);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/e882cbb9/be/src/service/impala-hs2-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index 4381f81..80ace87 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -34,6 +34,7 @@
 #include "common/logging.h"
 #include "common/version.h"
 #include "rpc/thrift-util.h"
+#include "runtime/coordinator.h"
 #include "runtime/raw-value.h"
 #include "runtime/exec-env.h"
 #include "service/hs2-util.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/e882cbb9/be/src/service/impala-http-handler.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc
index 43b03d1..3841bfe 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -24,6 +24,7 @@
 
 #include "catalog/catalog-util.h"
 #include "gen-cpp/beeswax_types.h"
+#include "runtime/coordinator.h"
 #include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/query-state.h"
@@ -31,6 +32,7 @@
 #include "runtime/timestamp-value.inline.h"
 #include "service/impala-server.h"
 #include "service/client-request-state.h"
+#include "service/frontend.h"
 #include "thrift/protocol/TDebugProtocol.h"
 #include "util/coding-util.h"
 #include "util/logging-support.h"