You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2018/03/29 23:30:45 UTC

[1/5] impala git commit: IMPALA-6731: Move execnet Python dependency to stage 2

Repository: impala
Updated Branches:
  refs/heads/master 8091b2f46 -> 1c4775d92


IMPALA-6731: Move execnet Python dependency to stage 2

It seems that execnet also cannot be installed together with
setuptools-scm if only a local mirror and index are available
(similar to https://github.com/pywebhdfs/pywebhdfs/issues/52).

Testing: Observed that execnet failed to install during
bootstrap_toolchain.py on a CentOS 6.4 EC2 instanc at 5:02pm (within the
brownout period). With this change, bootstrap_toolchain.py succeeded.

Change-Id: Ic949edcc03f0e068bdd84b6ede487e64dcf2439b
Reviewed-on: http://gerrit.cloudera.org:8080/9850
Reviewed-by: David Knupp <dk...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/2194dfd0
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/2194dfd0
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/2194dfd0

Branch: refs/heads/master
Commit: 2194dfd0ff63a51c1b17b1bbcd78895d0c2d6951
Parents: 8091b2f
Author: Lars Volker <lv...@cloudera.com>
Authored: Wed Mar 28 16:58:18 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Mar 29 04:38:31 2018 +0000

----------------------------------------------------------------------
 infra/python/deps/requirements.txt        | 3 ---
 infra/python/deps/stage2-requirements.txt | 3 +++
 2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/2194dfd0/infra/python/deps/requirements.txt
----------------------------------------------------------------------
diff --git a/infra/python/deps/requirements.txt b/infra/python/deps/requirements.txt
index bea16f4..d55bc19 100644
--- a/infra/python/deps/requirements.txt
+++ b/infra/python/deps/requirements.txt
@@ -39,9 +39,6 @@ Flask == 0.10.1
   MarkupSafe == 0.23
   Werkzeug == 0.11.3
   itsdangerous == 0.24
-hdfs == 2.0.2
-  docopt == 0.6.2
-  execnet == 1.4.0
 kazoo == 2.2.1
 ordereddict == 1.1
 pexpect == 3.3

http://git-wip-us.apache.org/repos/asf/impala/blob/2194dfd0/infra/python/deps/stage2-requirements.txt
----------------------------------------------------------------------
diff --git a/infra/python/deps/stage2-requirements.txt b/infra/python/deps/stage2-requirements.txt
index eda2cd3..c7c947c 100644
--- a/infra/python/deps/stage2-requirements.txt
+++ b/infra/python/deps/stage2-requirements.txt
@@ -27,6 +27,9 @@ pytest == 2.9.2
   pytest-random == 0.02
   pytest-runner == 4.2
   pytest-xdist == 1.17.1
+hdfs == 2.0.2
+  docopt == 0.6.2
+  execnet == 1.4.0
 
 # Requires pbr
 pywebhdfs == 0.3.2


[3/5] impala git commit: IMPALA-5384, part 1: introduce DmlExecState

Posted by kw...@apache.org.
http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 6c62a19..5eeb52a 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -49,6 +49,7 @@
 #include "rpc/thrift-thread.h"
 #include "rpc/thrift-util.h"
 #include "runtime/client-cache.h"
+#include "runtime/coordinator.h"
 #include "runtime/data-stream-mgr.h"
 #include "runtime/exec-env.h"
 #include "runtime/lib-cache.h"
@@ -59,6 +60,7 @@
 #include "service/impala-http-handler.h"
 #include "service/impala-internal-service.h"
 #include "service/client-request-state.h"
+#include "service/frontend.h"
 #include "util/bit-util.h"
 #include "util/container-util.h"
 #include "util/debug-util.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 23d4687..2af89fd 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -34,7 +34,6 @@
 #include "gen-cpp/Frontend_types.h"
 #include "rpc/thrift-server.h"
 #include "common/status.h"
-#include "service/frontend.h"
 #include "service/query-options.h"
 #include "util/condition-variable.h"
 #include "util/metrics.h"
@@ -43,8 +42,6 @@
 #include "util/simple-logger.h"
 #include "util/thread-pool.h"
 #include "util/time.h"
-#include "runtime/coordinator.h"
-#include "runtime/runtime-state.h"
 #include "runtime/timestamp-value.h"
 #include "runtime/types.h"
 #include "statestore/statestore-subscriber.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/testutil/in-process-servers.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/in-process-servers.cc b/be/src/testutil/in-process-servers.cc
index 7ff44a8..19afcd5 100644
--- a/be/src/testutil/in-process-servers.cc
+++ b/be/src/testutil/in-process-servers.cc
@@ -29,6 +29,7 @@
 #include "util/metrics.h"
 #include "util/openssl-util.h"
 #include "runtime/exec-env.h"
+#include "service/frontend.h"
 #include "service/impala-server.h"
 
 #include "common/names.h"


[5/5] impala git commit: IMPALA-6760: Fix for py2.7-ism in run-tests.py.

Posted by kw...@apache.org.
IMPALA-6760: Fix for py2.7-ism in run-tests.py.

A set-literal snuck into run-tests.py in a recent
change. We wish to avoid these to be able to run on
py2.6.

Change-Id: I81928d1880a493b91abb13b3a8149568c9789f66
Reviewed-on: http://gerrit.cloudera.org:8080/9843
Reviewed-by: Philip Zeyliger <ph...@cloudera.com>
Tested-by: Philip Zeyliger <ph...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/1c4775d9
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/1c4775d9
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/1c4775d9

Branch: refs/heads/master
Commit: 1c4775d92abab73f01cc967967aebeacc4ead5de
Parents: 408ee4d
Author: Philip Zeyliger <ph...@cloudera.com>
Authored: Wed Mar 28 15:15:42 2018 -0700
Committer: Philip Zeyliger <ph...@cloudera.com>
Committed: Thu Mar 29 15:35:19 2018 +0000

----------------------------------------------------------------------
 tests/run-tests.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/1c4775d9/tests/run-tests.py
----------------------------------------------------------------------
diff --git a/tests/run-tests.py b/tests/run-tests.py
index 6967b17..9552d0d 100755
--- a/tests/run-tests.py
+++ b/tests/run-tests.py
@@ -80,7 +80,7 @@ class TestCounterPlugin(object):
   # https://docs.pytest.org/en/2.9.2/writing_plugins.html#_pytest.hookspec.pytest_collection_modifyitems
   def pytest_collection_modifyitems(self, items):
       for item in items:
-          self.tests_collected.update({item.nodeid})
+          self.tests_collected.add(item.nodeid)
 
   # link to pytest_runtest_logreport
   # https://docs.pytest.org/en/2.9.2/_modules/_pytest/hookspec.html#pytest_runtest_logreport


[2/5] impala git commit: Revert "IMPALA-6389: Make '\0' delimited text files work"

Posted by kw...@apache.org.
Revert "IMPALA-6389: Make '\0' delimited text files work"

This reverts commit c2bdaf8af4cf35d3462595c2a341ed84dcf5d960.

An ASAN issue and potentially other problem have been found;
reverting to unbreak the build and tests.

Change-Id: If581311033de8c26e33316b19192c4579594f261
Reviewed-on: http://gerrit.cloudera.org:8080/9851
Reviewed-by: Lars Volker <lv...@cloudera.com>
Tested-by: Zach Amsden <za...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/b78daedf
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/b78daedf
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/b78daedf

Branch: refs/heads/master
Commit: b78daedf52bdaf5028bae90f157d37b6c5bea2c6
Parents: 2194dfd
Author: Zach Amsden <za...@cloudera.com>
Authored: Thu Mar 29 03:51:41 2018 +0000
Committer: Zach Amsden <za...@cloudera.com>
Committed: Thu Mar 29 04:59:48 2018 +0000

----------------------------------------------------------------------
 be/src/exec/delimited-text-parser-test.cc  | 56 ++++---------------
 be/src/exec/delimited-text-parser.cc       | 74 ++++++++-----------------
 be/src/exec/delimited-text-parser.h        | 43 +++++---------
 be/src/exec/delimited-text-parser.inline.h | 70 +++++++++++------------
 be/src/exec/hdfs-sequence-scanner.cc       |  2 +-
 be/src/exec/hdfs-sequence-scanner.h        |  3 +-
 be/src/exec/hdfs-text-scanner.cc           |  2 +-
 be/src/exec/hdfs-text-scanner.h            |  3 +-
 8 files changed, 84 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/b78daedf/be/src/exec/delimited-text-parser-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/delimited-text-parser-test.cc b/be/src/exec/delimited-text-parser-test.cc
index d8e977d..3156b36 100644
--- a/be/src/exec/delimited-text-parser-test.cc
+++ b/be/src/exec/delimited-text-parser-test.cc
@@ -24,7 +24,7 @@
 
 namespace impala {
 
-void Validate(TupleDelimitedTextParser* parser, const string& data,
+void Validate(DelimitedTextParser* parser, const string& data,
     int expected_offset, char tuple_delim, int expected_num_tuples,
     int expected_num_fields) {
   parser->ParserReset();
@@ -72,8 +72,8 @@ TEST(DelimitedTextParser, Basic) {
   bool is_materialized_col[NUM_COLS];
   for (int i = 0; i < NUM_COLS; ++i) is_materialized_col[i] = true;
 
-  TupleDelimitedTextParser no_escape_parser(NUM_COLS, 0, is_materialized_col,
-                                            TUPLE_DELIM, FIELD_DELIM, COLLECTION_DELIM);
+  DelimitedTextParser no_escape_parser(NUM_COLS, 0, is_materialized_col,
+                                       TUPLE_DELIM, FIELD_DELIM, COLLECTION_DELIM);
   // Note that only complete tuples "count"
   Validate(&no_escape_parser, "no_delims", -1, TUPLE_DELIM, 0, 0);
   Validate(&no_escape_parser, "abc||abc", 4, TUPLE_DELIM, 1, 1);
@@ -81,9 +81,9 @@ TEST(DelimitedTextParser, Basic) {
   Validate(&no_escape_parser, "a|bcd", 2, TUPLE_DELIM, 0, 0);
 
   // Test with escape char
-  TupleDelimitedTextParser escape_parser(NUM_COLS, 0, is_materialized_col,
-                                         TUPLE_DELIM, FIELD_DELIM, COLLECTION_DELIM,
-                                         ESCAPE_CHAR);
+  DelimitedTextParser escape_parser(NUM_COLS, 0, is_materialized_col,
+                                    TUPLE_DELIM, FIELD_DELIM, COLLECTION_DELIM,
+                                    ESCAPE_CHAR);
   Validate(&escape_parser, "a@|a|bcd", 5, TUPLE_DELIM, 0, 0);
   Validate(&escape_parser, "a@@|a|bcd", 4, TUPLE_DELIM, 1, 1);
   Validate(&escape_parser, "a@@@|a|bcd", 7, TUPLE_DELIM, 0, 0);
@@ -127,8 +127,8 @@ TEST(DelimitedTextParser, Fields) {
   bool is_materialized_col[NUM_COLS];
   for (int i = 0; i < NUM_COLS; ++i) is_materialized_col[i] = true;
 
-  TupleDelimitedTextParser no_escape_parser(NUM_COLS, 0, is_materialized_col,
-                                            TUPLE_DELIM, FIELD_DELIM, COLLECTION_DELIM);
+  DelimitedTextParser no_escape_parser(NUM_COLS, 0, is_materialized_col,
+                                       TUPLE_DELIM, FIELD_DELIM, COLLECTION_DELIM);
 
   Validate(&no_escape_parser, "a,b|c,d|e,f", 4, TUPLE_DELIM, 1, 3);
   Validate(&no_escape_parser, "b|c,d|e,f", 2, TUPLE_DELIM, 1, 3);
@@ -137,9 +137,9 @@ TEST(DelimitedTextParser, Fields) {
   const string str10("a,\0|c,d|e", 9);
   Validate(&no_escape_parser, str10, 4, TUPLE_DELIM, 1, 2);
 
-  TupleDelimitedTextParser escape_parser(NUM_COLS, 0, is_materialized_col,
-                                         TUPLE_DELIM, FIELD_DELIM, COLLECTION_DELIM,
-                                         ESCAPE_CHAR);
+  DelimitedTextParser escape_parser(NUM_COLS, 0, is_materialized_col,
+                                    TUPLE_DELIM, FIELD_DELIM, COLLECTION_DELIM,
+                                    ESCAPE_CHAR);
 
   Validate(&escape_parser, "a,b|c,d|e,f", 4, TUPLE_DELIM, 1, 3);
   Validate(&escape_parser, "a,@|c|e,f", 6, TUPLE_DELIM, 0, 1);
@@ -148,20 +148,14 @@ TEST(DelimitedTextParser, Fields) {
 
 TEST(DelimitedTextParser, SpecialDelimiters) {
   const char TUPLE_DELIM = '\n'; // implies '\r' and "\r\n" are also delimiters
-  const char NUL_DELIM = '\0';
   const int NUM_COLS = 1;
 
   bool is_materialized_col[NUM_COLS];
   for (int i = 0; i < NUM_COLS; ++i) is_materialized_col[i] = true;
 
-  TupleDelimitedTextParser tuple_delim_parser(NUM_COLS, 0, is_materialized_col,
+  DelimitedTextParser tuple_delim_parser(NUM_COLS, 0, is_materialized_col,
       TUPLE_DELIM);
 
-  TupleDelimitedTextParser nul_delim_parser(NUM_COLS, 0, is_materialized_col, NUL_DELIM);
-
-  TupleDelimitedTextParser nul_field_parser(2, 0, is_materialized_col,
-                                            TUPLE_DELIM, NUL_DELIM);
-
   // Non-SSE case
   Validate(&tuple_delim_parser, "A\r\nB", 3, TUPLE_DELIM, 0, 0);
   Validate(&tuple_delim_parser, "A\rB", 2, TUPLE_DELIM, 0, 0);
@@ -171,16 +165,6 @@ TEST(DelimitedTextParser, SpecialDelimiters) {
   Validate(&tuple_delim_parser, "A\rB\nC\r\nD", 2, TUPLE_DELIM, 2, 2);
   Validate(&tuple_delim_parser, "\r\r\n\n", 1, TUPLE_DELIM, 2, 2);
 
-  // NUL tuple delimiter; no field delimiter
-  const string nul1("\0\0\0", 3);
-  const string nul2("AAA\0BBB\0", 8);
-  const string nul3("\n\0\r\0\r\n\0", 7);
-  const string nul4("\n\0\r\0\r\n", 6);
-  Validate(&nul_delim_parser, nul1, 1, NUL_DELIM, 2, 2);
-  Validate(&nul_delim_parser, nul2, 4, NUL_DELIM, 1, 1);
-  Validate(&nul_delim_parser, nul3, 2, NUL_DELIM, 2, 2);
-  Validate(&nul_delim_parser, nul4, 2, NUL_DELIM, 1, 1);
-
   // SSE case
   string data = "\rAAAAAAAAAAAAAAA";
   DCHECK_EQ(data.size(), SSEUtil::CHARS_PER_128_BIT_REGISTER);
@@ -194,22 +178,6 @@ TEST(DelimitedTextParser, SpecialDelimiters) {
   data = "\r\nAAA\n\r\r\nAAAAAAA";
   DCHECK_EQ(data.size(), SSEUtil::CHARS_PER_128_BIT_REGISTER);
   Validate(&tuple_delim_parser, data, 2, TUPLE_DELIM, 3, 3);
-
-  // NUL SSE case
-  const string nulsse1("AAAAA\0AAAAAAAAAAA\0AAAAAAAAAAAA\0\0", 32);
-  const string nulsse2("AAAAA\0AAAAAAAAAAA\0AAAAAAAAAAAA\0A", 32);
-  const string nulsse3("AAA\0BBBbbbbbbbbbbbbbbbbbbb\0cccc,ddd\0", 36);
-  const string nulsse4("AAA\0BBBbbbbbbbbbbbbbbbbbbb\0cccc,dddd", 36);
-  Validate(&nul_delim_parser, nulsse1, 6, NUL_DELIM, 3, 3);
-  Validate(&nul_delim_parser, nulsse2, 6, NUL_DELIM, 2, 2);
-  Validate(&nul_delim_parser, nulsse3, 4, NUL_DELIM, 2, 2);
-  Validate(&nul_delim_parser, nulsse4, 4, NUL_DELIM, 1, 1);
-
-  // NUL Field delimiters
-  const string field1("\na\0b\0c\n", 7);
-  const string field2("aaaa\na\0b\0c\naaaaa\0b\na\0b\0c\n", 25);
-  Validate(&nul_field_parser, field1, 1, TUPLE_DELIM, 1, 2);
-  Validate(&nul_field_parser, field2, 5, TUPLE_DELIM, 3, 6);
 }
 
 // TODO: expand test for other delimited text parser functions/cases.

http://git-wip-us.apache.org/repos/asf/impala/blob/b78daedf/be/src/exec/delimited-text-parser.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/delimited-text-parser.cc b/be/src/exec/delimited-text-parser.cc
index 7db65fd..18fcde1 100644
--- a/be/src/exec/delimited-text-parser.cc
+++ b/be/src/exec/delimited-text-parser.cc
@@ -24,8 +24,7 @@
 
 using namespace impala;
 
-template<bool DELIMITED_TUPLES>
-DelimitedTextParser<DELIMITED_TUPLES>::DelimitedTextParser(
+DelimitedTextParser::DelimitedTextParser(
     int num_cols, int num_partition_keys, const bool* is_materialized_col,
     char tuple_delim, char field_delim, char collection_item_delim, char escape_char)
     : is_materialized_col_(is_materialized_col),
@@ -73,7 +72,7 @@ DelimitedTextParser<DELIMITED_TUPLES>::DelimitedTextParser(
     memset(low_mask_, 0, sizeof(low_mask_));
   }
 
-  if (DELIMITED_TUPLES) {
+  if (tuple_delim != '\0') {
     search_chars[num_delims_++] = tuple_delim_;
     ++num_tuple_delims_;
     // Hive will treats \r (^M) as an alternate tuple delimiter, but \r\n is a
@@ -83,12 +82,12 @@ DelimitedTextParser<DELIMITED_TUPLES>::DelimitedTextParser(
       ++num_tuple_delims_;
     }
     xmm_tuple_search_ = _mm_loadu_si128(reinterpret_cast<__m128i*>(search_chars));
-    if (field_delim_ != tuple_delim_) search_chars[num_delims_++] = field_delim_;
-  } else {
-    search_chars[num_delims_++] = field_delim_;
   }
 
-  if (collection_item_delim != '\0') search_chars[num_delims_++] = collection_item_delim_;
+  if (field_delim != '\0' || collection_item_delim != '\0') {
+    search_chars[num_delims_++] = field_delim_;
+    search_chars[num_delims_++] = collection_item_delim_;
+  }
 
   DCHECK_GT(num_delims_, 0);
   xmm_delim_search_ = _mm_loadu_si128(reinterpret_cast<__m128i*>(search_chars));
@@ -96,30 +95,16 @@ DelimitedTextParser<DELIMITED_TUPLES>::DelimitedTextParser(
   ParserReset();
 }
 
-template
-DelimitedTextParser<true>::DelimitedTextParser(
-    int num_cols, int num_partition_keys, const bool* is_materialized_col,
-    char tuple_delim, char field_delim, char collection_item_delim, char escape_char);
-
-template
-DelimitedTextParser<false>::DelimitedTextParser(
-    int num_cols, int num_partition_keys, const bool* is_materialized_col,
-    char tuple_delim, char field_delim, char collection_item_delim, char escape_char);
-
-template<bool DELIMITED_TUPLES>
-void DelimitedTextParser<DELIMITED_TUPLES>::ParserReset() {
+void DelimitedTextParser::ParserReset() {
   current_column_has_escape_ = false;
   last_char_is_escape_ = false;
   last_row_delim_offset_ = -1;
   column_idx_ = num_partition_keys_;
 }
 
-template void DelimitedTextParser<true>::ParserReset();
-
 // Parsing raw csv data into FieldLocation descriptors.
-template<bool DELIMITED_TUPLES>
-Status DelimitedTextParser<DELIMITED_TUPLES>::ParseFieldLocations(int max_tuples,
-    int64_t remaining_len, char** byte_buffer_ptr, char** row_end_locations,
+Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remaining_len,
+    char** byte_buffer_ptr, char** row_end_locations,
     FieldLocation* field_locations,
     int* num_tuples, int* num_fields, char** next_column_start) {
   // Start of this batch.
@@ -148,10 +133,10 @@ Status DelimitedTextParser<DELIMITED_TUPLES>::ParseFieldLocations(int max_tuples
   while (remaining_len > 0) {
     bool new_tuple = false;
     bool new_col = false;
-    if (DELIMITED_TUPLES) unfinished_tuple_ = true;
+    unfinished_tuple_ = true;
 
     if (!last_char_is_escape_) {
-      if (DELIMITED_TUPLES && (**byte_buffer_ptr == tuple_delim_ ||
+      if (tuple_delim_ != '\0' && (**byte_buffer_ptr == tuple_delim_ ||
            (tuple_delim_ == '\n' && **byte_buffer_ptr == '\r'))) {
         new_tuple = true;
         new_col = true;
@@ -181,7 +166,6 @@ Status DelimitedTextParser<DELIMITED_TUPLES>::ParseFieldLocations(int max_tuples
         row_end_locations[*num_tuples] = *byte_buffer_ptr;
         ++(*num_tuples);
       }
-      DCHECK(DELIMITED_TUPLES);
       unfinished_tuple_ = false;
       last_row_delim_offset_ = **byte_buffer_ptr == '\r' ? remaining_len - 1 : -1;
       if (*num_tuples == max_tuples) {
@@ -201,7 +185,7 @@ Status DelimitedTextParser<DELIMITED_TUPLES>::ParseFieldLocations(int max_tuples
 
   // For formats that store the length of the row, the row is not delimited:
   // e.g. Sequence files.
-  if (!DELIMITED_TUPLES) {
+  if (tuple_delim_ == '\0') {
     DCHECK_EQ(remaining_len, 0);
     RETURN_IF_ERROR(AddColumn<true>(*byte_buffer_ptr - *next_column_start,
         next_column_start, num_fields, field_locations));
@@ -209,30 +193,18 @@ Status DelimitedTextParser<DELIMITED_TUPLES>::ParseFieldLocations(int max_tuples
     DCHECK(status.ok());
     column_idx_ = num_partition_keys_;
     ++(*num_tuples);
+    unfinished_tuple_ = false;
   }
   return Status::OK();
 }
 
-template
-Status DelimitedTextParser<true>::ParseFieldLocations(int max_tuples,
-    int64_t remaining_len, char** byte_buffer_ptr, char** row_end_locations,
-    FieldLocation* field_locations,
-    int* num_tuples, int* num_fields, char** next_column_start);
-
-template
-Status DelimitedTextParser<false>::ParseFieldLocations(int max_tuples,
-    int64_t remaining_len, char** byte_buffer_ptr, char** row_end_locations,
-    FieldLocation* field_locations,
-    int* num_tuples, int* num_fields, char** next_column_start);
-
-template<bool DELIMITED_TUPLES>
-int64_t DelimitedTextParser<DELIMITED_TUPLES>::FindFirstInstance(const char* buffer,
-    int64_t len) {
+// Find the first instance of the tuple delimiter. This will find the start of the first
+// full tuple in buffer by looking for the end of the previous tuple.
+int64_t DelimitedTextParser::FindFirstInstance(const char* buffer, int64_t len) {
   int64_t tuple_start = 0;
   const char* buffer_start = buffer;
   bool found = false;
 
-  DCHECK(DELIMITED_TUPLES);
   // If the last char in the previous buffer was \r then either return the start of
   // this buffer or skip a \n at the beginning of the buffer.
   if (last_row_delim_offset_ != -1) {
@@ -254,10 +226,13 @@ restart:
       int tuple_mask = _mm_extract_epi16(xmm_tuple_mask, 0);
       if (tuple_mask != 0) {
         found = true;
-        // Find first set bit (1-based)
-        int i = ffs(tuple_mask);
-        tuple_start += i;
-        buffer += i;
+        for (int i = 0; i < SSEUtil::CHARS_PER_128_BIT_REGISTER; ++i) {
+          if ((tuple_mask & SSEUtil::SSE_BITMASK[i]) != 0) {
+            tuple_start += i + 1;
+            buffer += i + 1;
+            break;
+          }
+        }
         break;
       }
       tuple_start += SSEUtil::CHARS_PER_128_BIT_REGISTER;
@@ -320,6 +295,3 @@ restart:
   }
   return tuple_start;
 }
-
-template
-int64_t DelimitedTextParser<true>::FindFirstInstance(const char* buffer, int64_t len);

http://git-wip-us.apache.org/repos/asf/impala/blob/b78daedf/be/src/exec/delimited-text-parser.h
----------------------------------------------------------------------
diff --git a/be/src/exec/delimited-text-parser.h b/be/src/exec/delimited-text-parser.h
index 9b89127..b966081 100644
--- a/be/src/exec/delimited-text-parser.h
+++ b/be/src/exec/delimited-text-parser.h
@@ -25,27 +25,22 @@
 
 namespace impala {
 
-template <bool DELIMITED_TUPLES>
 class DelimitedTextParser {
  public:
 
   /// The Delimited Text Parser parses text rows that are delimited by specific
   /// characters:
-  ///   tuple_delim: delimits tuples.  Only used if DELIMITED_TUPLES is true.
+  ///   tuple_delim: delimits tuples
   ///   field_delim: delimits fields
   ///   collection_item_delim: delimits collection items
   ///   escape_char: escape delimiters, make them part of the data.
-  ///
-  /// If the template parameter DELIMITED_TUPLES is false there is no support
-  /// for tuple delimiters and we do not need to search for them.  Any value
-  /// may be passed for tuple_delim, as it is ignored.
-  ///
+  //
   /// 'num_cols' is the total number of columns including partition keys.
-  ///
+  //
   /// 'is_materialized_col' should be initialized to an array of length 'num_cols', with
   /// is_materialized_col[i] = <true if column i should be materialized, false otherwise>
   /// Owned by caller.
-  ///
+  //
   /// The main method is ParseData which fills in a vector of pointers and lengths to the
   /// fields.  It also can handle an escape character which masks a tuple or field
   /// delimiter that occurs in the data.
@@ -96,14 +91,14 @@ class DelimitedTextParser {
   /// This function is used to parse sequence file records which do not need to
   /// parse for tuple delimiters. Returns an error status if any column exceeds the
   /// size limit. See AddColumn() for details.
-  /// This function is disabled for non-sequence file parsing.
-  template <bool PROCESS_ESCAPES>
+  template <bool process_escapes>
   Status ParseSingleTuple(int64_t len, char* buffer, FieldLocation* field_locations,
       int* num_fields);
 
   /// FindFirstInstance returns the position after the first non-escaped tuple
   /// delimiter from the starting offset.
   /// Used to find the start of a tuple if jumping into the middle of a text file.
+  /// Also used to find the sync marker for Sequenced and RC files.
   /// If no tuple delimiter is found within the buffer, return -1;
   int64_t FindFirstInstance(const char* buffer, int64_t len);
 
@@ -124,16 +119,13 @@ class DelimitedTextParser {
   /// by the number fields added.
   /// 'field_locations' will be updated with the start and length of the fields.
   /// Returns an error status if 'len' exceeds the size limit specified in AddColumn().
-  template <bool PROCESS_ESCAPES>
+  template <bool process_escapes>
   Status FillColumns(int64_t len, char** last_column, int* num_fields,
       impala::FieldLocation* field_locations);
 
   /// Return true if we have not seen a tuple delimiter for the current tuple being
   /// parsed (i.e., the last byte read was not a tuple delimiter).
-  bool HasUnfinishedTuple() {
-    DCHECK(DELIMITED_TUPLES);
-    return unfinished_tuple_;
-  }
+  bool HasUnfinishedTuple() { return unfinished_tuple_; }
 
  private:
   /// Initialize the parser state.
@@ -141,7 +133,7 @@ class DelimitedTextParser {
 
   /// Helper routine to add a column to the field_locations vector.
   /// Template parameter:
-  ///   PROCESS_ESCAPES -- if true the the column may have escape characters
+  ///   process_escapes -- if true the the column may have escape characters
   ///                      and the negative of the len will be stored.
   ///   len: length of the current column. The length of a column must fit in a 32-bit
   ///        signed integer (i.e. <= 2147483647 bytes). If a column is larger than that,
@@ -152,29 +144,23 @@ class DelimitedTextParser {
   /// Output:
   ///   field_locations: updated with start and length of current field.
   /// Return an error status if 'len' exceeds the size limit specified above.
-  template <bool PROCESS_ESCAPES>
+  template <bool process_escapes>
   Status AddColumn(int64_t len, char** next_column_start, int* num_fields,
       FieldLocation* field_locations);
 
   /// Helper routine to parse delimited text using SSE instructions.
   /// Identical arguments as ParseFieldLocations.
-  /// If the template argument, 'PROCESS_ESCAPES' is true, this function will handle
+  /// If the template argument, 'process_escapes' is true, this function will handle
   /// escapes, otherwise, it will assume the text is unescaped.  By using templates,
   /// we can special case the un-escaped path for better performance.  The unescaped
   /// path is optimized away by the compiler. Returns an error status if the length
   /// of any column exceeds the size limit. See AddColumn() for details.
-  template <bool PROCESS_ESCAPES>
+  template <bool process_escapes>
   Status ParseSse(int max_tuples, int64_t* remaining_len,
       char** byte_buffer_ptr, char** row_end_locations_,
       FieldLocation* field_locations,
       int* num_tuples, int* num_fields, char** next_column_start);
 
-  bool IsFieldOrCollectionItemDelimiter(char c) {
-    return (!DELIMITED_TUPLES && c == field_delim_) ||
-      (DELIMITED_TUPLES && field_delim_ != tuple_delim_ && c == field_delim_) ||
-      (collection_item_delim_ != '\0' && c == collection_item_delim_);
-  }
-
   /// SSE(xmm) register containing the tuple search character(s).
   __m128i xmm_tuple_search_;
 
@@ -228,7 +214,7 @@ class DelimitedTextParser {
   /// Character delimiting collection items (to become slots).
   char collection_item_delim_;
 
-  /// Character delimiting tuples.  Only used if DELIMITED_TUPLES is true.
+  /// Character delimiting tuples.
   char tuple_delim_;
 
   /// Whether or not the current column has an escape character in it
@@ -242,8 +228,5 @@ class DelimitedTextParser {
   bool unfinished_tuple_;
 };
 
-using TupleDelimitedTextParser = DelimitedTextParser<true>;
-using SequenceDelimitedTextParser = DelimitedTextParser<false>;
-
 }// namespace impala
 #endif// IMPALA_EXEC_DELIMITED_TEXT_PARSER_H

http://git-wip-us.apache.org/repos/asf/impala/blob/b78daedf/be/src/exec/delimited-text-parser.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/delimited-text-parser.inline.h b/be/src/exec/delimited-text-parser.inline.h
index 9fe737e..02fa132 100644
--- a/be/src/exec/delimited-text-parser.inline.h
+++ b/be/src/exec/delimited-text-parser.inline.h
@@ -52,10 +52,9 @@ inline void ProcessEscapeMask(uint16_t escape_mask, bool* last_char_is_escape,
   *delim_mask &= ~escape_mask;
 }
 
-template <bool DELIMITED_TUPLES>
-template <bool PROCESS_ESCAPES>
-inline Status DelimitedTextParser<DELIMITED_TUPLES>::AddColumn(int64_t len,
-    char** next_column_start, int* num_fields, FieldLocation* field_locations) {
+template <bool process_escapes>
+inline Status DelimitedTextParser::AddColumn(int64_t len, char** next_column_start,
+    int* num_fields, FieldLocation* field_locations) {
   if (UNLIKELY(!BitUtil::IsNonNegative32Bit(len))) {
     return Status(TErrorCode::TEXT_PARSER_TRUNCATED_COLUMN, len);
   }
@@ -63,27 +62,26 @@ inline Status DelimitedTextParser<DELIMITED_TUPLES>::AddColumn(int64_t len,
     // Found a column that needs to be parsed, write the start/len to 'field_locations'
     field_locations[*num_fields].start = *next_column_start;
     int64_t field_len = len;
-    if (PROCESS_ESCAPES && current_column_has_escape_) {
+    if (process_escapes && current_column_has_escape_) {
       field_len = -len;
     }
     field_locations[*num_fields].len = static_cast<int32_t>(field_len);
     ++(*num_fields);
   }
-  if (PROCESS_ESCAPES) current_column_has_escape_ = false;
+  if (process_escapes) current_column_has_escape_ = false;
   *next_column_start += len + 1;
   ++column_idx_;
   return Status::OK();
 }
 
-template <bool DELIMITED_TUPLES>
-template <bool PROCESS_ESCAPES>
-inline Status DelimitedTextParser<DELIMITED_TUPLES>::FillColumns(int64_t len,
-    char** last_column, int* num_fields, FieldLocation* field_locations) {
+template <bool process_escapes>
+inline Status DelimitedTextParser::FillColumns(int64_t len, char** last_column,
+    int* num_fields, FieldLocation* field_locations) {
   // Fill in any columns missing from the end of the tuple.
   char* dummy = NULL;
   if (last_column == NULL) last_column = &dummy;
   while (column_idx_ < num_cols_) {
-    RETURN_IF_ERROR(AddColumn<PROCESS_ESCAPES>(len, last_column,
+    RETURN_IF_ERROR(AddColumn<process_escapes>(len, last_column,
         num_fields, field_locations));
     // The rest of the columns will be null.
     last_column = &dummy;
@@ -105,9 +103,8 @@ inline Status DelimitedTextParser<DELIMITED_TUPLES>::FillColumns(int64_t len,
 ///  Needle   = 'abcd000000000000' (we're searching for any a's, b's, c's or d's)
 ///  Haystack = 'asdfghjklhjbdwwc' (the raw string)
 ///  Result   = '1010000000011001'
-template <bool DELIMITED_TUPLES>
-template <bool PROCESS_ESCAPES>
-inline Status DelimitedTextParser<DELIMITED_TUPLES>::ParseSse(int max_tuples,
+template <bool process_escapes>
+inline Status DelimitedTextParser::ParseSse(int max_tuples,
     int64_t* remaining_len, char** byte_buffer_ptr,
     char** row_end_locations, FieldLocation* field_locations,
     int* num_tuples, int* num_fields, char** next_column_start) {
@@ -149,7 +146,7 @@ inline Status DelimitedTextParser<DELIMITED_TUPLES>::ParseSse(int max_tuples,
 
     uint16_t escape_mask = 0;
     // If the table does not use escape characters, skip processing for it.
-    if (PROCESS_ESCAPES) {
+    if (process_escapes) {
       DCHECK(escape_char_ != '\0');
       xmm_escape_mask = SSE4_cmpestrm<SSEUtil::STRCHR_MODE>(xmm_escape_search_, 1,
           xmm_buffer, SSEUtil::CHARS_PER_128_BIT_REGISTER);
@@ -159,10 +156,8 @@ inline Status DelimitedTextParser<DELIMITED_TUPLES>::ParseSse(int max_tuples,
 
     char* last_char = *byte_buffer_ptr + 15;
     bool last_char_is_unescaped_delim = delim_mask >> 15;
-    if (DELIMITED_TUPLES) {
-      unfinished_tuple_ = !(last_char_is_unescaped_delim &&
-          (*last_char == tuple_delim_ || (tuple_delim_ == '\n' && *last_char == '\r')));
-    }
+    unfinished_tuple_ = !(last_char_is_unescaped_delim &&
+        (*last_char == tuple_delim_ || (tuple_delim_ == '\n' && *last_char == '\r')));
 
     int last_col_idx = 0;
     // Process all non-zero bits in the delim_mask from lsb->msb.  If a bit
@@ -175,7 +170,7 @@ inline Status DelimitedTextParser<DELIMITED_TUPLES>::ParseSse(int max_tuples,
       // clear current bit
       delim_mask &= ~(SSEUtil::SSE_BITMASK[n]);
 
-      if (PROCESS_ESCAPES) {
+      if (process_escapes) {
         // Determine if there was an escape character between [last_col_idx, n]
         bool escaped = (escape_mask & low_mask_[last_col_idx] & high_mask_[n]) != 0;
         current_column_has_escape_ |= escaped;
@@ -184,14 +179,13 @@ inline Status DelimitedTextParser<DELIMITED_TUPLES>::ParseSse(int max_tuples,
 
       char* delim_ptr = *byte_buffer_ptr + n;
 
-      if (IsFieldOrCollectionItemDelimiter(*delim_ptr)) {
-        RETURN_IF_ERROR(AddColumn<PROCESS_ESCAPES>(delim_ptr - *next_column_start,
+      if (*delim_ptr == field_delim_ || *delim_ptr == collection_item_delim_) {
+        RETURN_IF_ERROR(AddColumn<process_escapes>(delim_ptr - *next_column_start,
             next_column_start, num_fields, field_locations));
         continue;
       }
 
-      if (DELIMITED_TUPLES &&
-          (*delim_ptr == tuple_delim_ || (tuple_delim_ == '\n' && *delim_ptr == '\r'))) {
+      if (*delim_ptr == tuple_delim_ || (tuple_delim_ == '\n' && *delim_ptr == '\r')) {
         if (UNLIKELY(
                 last_row_delim_offset_ == *remaining_len - n && *delim_ptr == '\n')) {
           // If the row ended in \r\n then move the next start past the \n
@@ -199,7 +193,7 @@ inline Status DelimitedTextParser<DELIMITED_TUPLES>::ParseSse(int max_tuples,
           last_row_delim_offset_ = -1;
           continue;
         }
-        RETURN_IF_ERROR(AddColumn<PROCESS_ESCAPES>(delim_ptr - *next_column_start,
+        RETURN_IF_ERROR(AddColumn<process_escapes>(delim_ptr - *next_column_start,
             next_column_start, num_fields, field_locations));
         Status status = FillColumns<false>(0, NULL, num_fields, field_locations);
         DCHECK(status.ok());
@@ -210,7 +204,7 @@ inline Status DelimitedTextParser<DELIMITED_TUPLES>::ParseSse(int max_tuples,
         last_row_delim_offset_ = *delim_ptr == '\r' ? *remaining_len - n - 1 : -1;
         if (UNLIKELY(*num_tuples == max_tuples)) {
           (*byte_buffer_ptr) += (n + 1);
-          if (PROCESS_ESCAPES) last_char_is_escape_ = false;
+          if (process_escapes) last_char_is_escape_ = false;
           *remaining_len -= (n + 1);
           // If the last character we processed was \r then set the offset to 0
           // so that we will use it at the beginning of the next batch.
@@ -220,7 +214,7 @@ inline Status DelimitedTextParser<DELIMITED_TUPLES>::ParseSse(int max_tuples,
       }
     }
 
-    if (PROCESS_ESCAPES) {
+    if (process_escapes) {
       // Determine if there was an escape character between (last_col_idx, 15)
       bool unprocessed_escape = escape_mask & low_mask_[last_col_idx] & high_mask_[15];
       current_column_has_escape_ |= unprocessed_escape;
@@ -233,10 +227,9 @@ inline Status DelimitedTextParser<DELIMITED_TUPLES>::ParseSse(int max_tuples,
 }
 
 /// Simplified version of ParseSSE which does not handle tuple delimiters.
-template<>
-template <bool PROCESS_ESCAPES>
-inline Status DelimitedTextParser<false>::ParseSingleTuple(int64_t remaining_len,
-    char* buffer, FieldLocation* field_locations, int* num_fields) {
+template <bool process_escapes>
+inline Status DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char* buffer,
+    FieldLocation* field_locations, int* num_fields) {
   char* next_column_start = buffer;
   __m128i xmm_buffer, xmm_delim_mask, xmm_escape_mask;
 
@@ -253,7 +246,7 @@ inline Status DelimitedTextParser<false>::ParseSingleTuple(int64_t remaining_len
 
       uint16_t escape_mask = 0;
       // If the table does not use escape characters, skip processing for it.
-      if (PROCESS_ESCAPES) {
+      if (process_escapes) {
         DCHECK(escape_char_ != '\0');
         xmm_escape_mask = SSE4_cmpestrm<SSEUtil::STRCHR_MODE>(xmm_escape_search_, 1,
             xmm_buffer, SSEUtil::CHARS_PER_128_BIT_REGISTER);
@@ -270,7 +263,7 @@ inline Status DelimitedTextParser<false>::ParseSingleTuple(int64_t remaining_len
         DCHECK_GE(n, 0);
         DCHECK_LT(n, 16);
 
-        if (PROCESS_ESCAPES) {
+        if (process_escapes) {
           // Determine if there was an escape character between [last_col_idx, n]
           bool escaped = (escape_mask & low_mask_[last_col_idx] & high_mask_[n]) != 0;
           current_column_has_escape_ |= escaped;
@@ -280,11 +273,11 @@ inline Status DelimitedTextParser<false>::ParseSingleTuple(int64_t remaining_len
         // clear current bit
         delim_mask &= ~(SSEUtil::SSE_BITMASK[n]);
 
-        RETURN_IF_ERROR(AddColumn<PROCESS_ESCAPES>(buffer + n - next_column_start,
+        RETURN_IF_ERROR(AddColumn<process_escapes>(buffer + n - next_column_start,
             &next_column_start, num_fields, field_locations));
       }
 
-      if (PROCESS_ESCAPES) {
+      if (process_escapes) {
         // Determine if there was an escape character between (last_col_idx, 15)
         bool unprocessed_escape = escape_mask & low_mask_[last_col_idx] & high_mask_[15];
         current_column_has_escape_ |= unprocessed_escape;
@@ -303,8 +296,9 @@ inline Status DelimitedTextParser<false>::ParseSingleTuple(int64_t remaining_len
       last_char_is_escape_ = false;
     }
 
-    if (!last_char_is_escape_ && IsFieldOrCollectionItemDelimiter(*buffer)) {
-      RETURN_IF_ERROR(AddColumn<PROCESS_ESCAPES>(buffer - next_column_start,
+    if (!last_char_is_escape_ &&
+          (*buffer == field_delim_ || *buffer == collection_item_delim_)) {
+      RETURN_IF_ERROR(AddColumn<process_escapes>(buffer - next_column_start,
           &next_column_start, num_fields, field_locations));
     }
 
@@ -314,7 +308,7 @@ inline Status DelimitedTextParser<false>::ParseSingleTuple(int64_t remaining_len
 
   // Last column does not have a delimiter after it.  Add that column and also
   // pad with empty cols if the input is ragged.
-  return FillColumns<PROCESS_ESCAPES>(buffer - next_column_start,
+  return FillColumns<process_escapes>(buffer - next_column_start,
       &next_column_start, num_fields, field_locations);
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/b78daedf/be/src/exec/hdfs-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc
index 8a9151e..346a18a 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -73,7 +73,7 @@ Status HdfsSequenceScanner::InitNewRange() {
   text_converter_.reset(new TextConverter(hdfs_partition->escape_char(),
       scan_node_->hdfs_table()->null_column_value()));
 
-  delimited_text_parser_.reset(new SequenceDelimitedTextParser(
+  delimited_text_parser_.reset(new DelimitedTextParser(
       scan_node_->hdfs_table()->num_cols(), scan_node_->num_partition_keys(),
       scan_node_->is_materialized_col(), '\0', hdfs_partition->field_delim(),
       hdfs_partition->collection_delim(), hdfs_partition->escape_char()));

http://git-wip-us.apache.org/repos/asf/impala/blob/b78daedf/be/src/exec/hdfs-sequence-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.h b/be/src/exec/hdfs-sequence-scanner.h
index 463ffc7..4845edb 100644
--- a/be/src/exec/hdfs-sequence-scanner.h
+++ b/be/src/exec/hdfs-sequence-scanner.h
@@ -153,7 +153,6 @@
 
 namespace impala {
 
-template <bool>
 class DelimitedTextParser;
 
 class HdfsSequenceScanner : public BaseSequenceScanner {
@@ -223,7 +222,7 @@ class HdfsSequenceScanner : public BaseSequenceScanner {
   Status GetRecord(uint8_t** record_ptr, int64_t* record_len) WARN_UNUSED_RESULT;
 
   /// Helper class for picking fields and rows from delimited text.
-  boost::scoped_ptr<DelimitedTextParser<false>> delimited_text_parser_;
+  boost::scoped_ptr<DelimitedTextParser> delimited_text_parser_;
   std::vector<FieldLocation> field_locations_;
 
   /// Data that is fixed across headers.  This struct is shared between scan ranges.

http://git-wip-us.apache.org/repos/asf/impala/blob/b78daedf/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index b78115d..253bcc8 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -203,7 +203,7 @@ Status HdfsTextScanner::InitNewRange() {
     collection_delim = '\0';
   }
 
-  delimited_text_parser_.reset(new TupleDelimitedTextParser(
+  delimited_text_parser_.reset(new DelimitedTextParser(
       scan_node_->hdfs_table()->num_cols(), scan_node_->num_partition_keys(),
       scan_node_->is_materialized_col(), hdfs_partition->line_delim(),
       field_delim, collection_delim, hdfs_partition->escape_char()));

http://git-wip-us.apache.org/repos/asf/impala/blob/b78daedf/be/src/exec/hdfs-text-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index 25886ba..610c612 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -25,7 +25,6 @@
 
 namespace impala {
 
-template<bool>
 class DelimitedTextParser;
 class ScannerContext;
 struct HdfsFileDesc;
@@ -238,7 +237,7 @@ class HdfsTextScanner : public HdfsScanner {
   int slot_idx_;
 
   /// Helper class for picking fields and rows from delimited text.
-  boost::scoped_ptr<DelimitedTextParser<true>> delimited_text_parser_;
+  boost::scoped_ptr<DelimitedTextParser> delimited_text_parser_;
 
   /// Return field locations from the Delimited Text Parser.
   std::vector<FieldLocation> field_locations_;


[4/5] impala git commit: IMPALA-5384, part 1: introduce DmlExecState

Posted by kw...@apache.org.
IMPALA-5384, part 1: introduce DmlExecState

This change is based on a patch by Marcel Kornacker.

Move data structures that collect DML operation stats from the
RuntimeState and Coordinator into a new InsertExecState class, which
has it's own lock.  This removes a dependency on the coordinator's
lock, which will allow further coordinator locking cleanup in the next
patch.

Change-Id: Id4c025917620a7bff2acbeb46464f107ab4b7565
Reviewed-on: http://gerrit.cloudera.org:8080/9793
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/408ee4d6
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/408ee4d6
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/408ee4d6

Branch: refs/heads/master
Commit: 408ee4d68cbb7217eba4b020064a1e878e412ce8
Parents: b78daed
Author: Dan Hecht <dh...@cloudera.com>
Authored: Fri Mar 23 16:28:27 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Mar 29 06:29:16 2018 +0000

----------------------------------------------------------------------
 be/src/benchmarks/expr-benchmark.cc         |   1 +
 be/src/exec/catalog-op-executor.cc          |   1 +
 be/src/exec/data-sink.cc                    |  63 ---
 be/src/exec/data-sink.h                     |   9 -
 be/src/exec/hbase-table-sink.cc             |  11 +-
 be/src/exec/hdfs-table-sink.cc              |  38 +-
 be/src/exec/kudu-table-sink.cc              |  22 +-
 be/src/exec/plan-root-sink.cc               |   1 +
 be/src/runtime/CMakeLists.txt               |   1 +
 be/src/runtime/coordinator-backend-state.cc |  11 +-
 be/src/runtime/coordinator-backend-state.h  |  18 +-
 be/src/runtime/coordinator.cc               | 361 ++---------------
 be/src/runtime/coordinator.h                |  84 +---
 be/src/runtime/dml-exec-state.cc            | 494 +++++++++++++++++++++++
 be/src/runtime/dml-exec-state.h             | 149 +++++++
 be/src/runtime/query-state.cc               |  13 +-
 be/src/runtime/runtime-filter-bank.cc       |   1 +
 be/src/runtime/runtime-state.h              |  29 +-
 be/src/service/client-request-state.cc      |  10 +-
 be/src/service/client-request-state.h       |   1 +
 be/src/service/impala-beeswax-server.cc     |  18 +-
 be/src/service/impala-hs2-server.cc         |   1 +
 be/src/service/impala-http-handler.cc       |   2 +
 be/src/service/impala-server.cc             |   2 +
 be/src/service/impala-server.h              |   3 -
 be/src/testutil/in-process-servers.cc       |   1 +
 26 files changed, 750 insertions(+), 595 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/benchmarks/expr-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/expr-benchmark.cc b/be/src/benchmarks/expr-benchmark.cc
index b10a70f..1b995a5 100644
--- a/be/src/benchmarks/expr-benchmark.cc
+++ b/be/src/benchmarks/expr-benchmark.cc
@@ -53,6 +53,7 @@
 #include "runtime/mem-pool.h"
 #include "runtime/mem-tracker.h"
 #include "service/fe-support.h"
+#include "service/frontend.h"
 #include "service/impala-server.h"
 
 #include "common/names.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/exec/catalog-op-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/catalog-op-executor.cc b/be/src/exec/catalog-op-executor.cc
index 12398cf..164187c 100644
--- a/be/src/exec/catalog-op-executor.cc
+++ b/be/src/exec/catalog-op-executor.cc
@@ -25,6 +25,7 @@
 #include "runtime/lib-cache.h"
 #include "runtime/client-cache-types.h"
 #include "runtime/exec-env.h"
+#include "service/frontend.h"
 #include "service/impala-server.h"
 #include "service/hs2-util.h"
 #include "util/string-parser.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/exec/data-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index f8f068e..9140b3e 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -121,69 +121,6 @@ Status DataSink::Init(const vector<TExpr>& thrift_output_exprs,
   return ScalarExpr::Create(thrift_output_exprs, *row_desc_, state, &output_exprs_);
 }
 
-void DataSink::MergeDmlStats(const TInsertStats& src_stats,
-    TInsertStats* dst_stats) {
-  dst_stats->bytes_written += src_stats.bytes_written;
-  if (src_stats.__isset.kudu_stats) {
-    if (!dst_stats->__isset.kudu_stats) dst_stats->__set_kudu_stats(TKuduDmlStats());
-    if (!dst_stats->kudu_stats.__isset.num_row_errors) {
-      dst_stats->kudu_stats.__set_num_row_errors(0);
-    }
-    dst_stats->kudu_stats.__set_num_row_errors(
-        dst_stats->kudu_stats.num_row_errors + src_stats.kudu_stats.num_row_errors);
-  }
-  if (src_stats.__isset.parquet_stats) {
-    if (dst_stats->__isset.parquet_stats) {
-      MergeMapValues<string, int64_t>(src_stats.parquet_stats.per_column_size,
-          &dst_stats->parquet_stats.per_column_size);
-    } else {
-      dst_stats->__set_parquet_stats(src_stats.parquet_stats);
-    }
-  }
-}
-
-string DataSink::OutputDmlStats(const PartitionStatusMap& stats,
-    const string& prefix) {
-  const char* indent = "  ";
-  stringstream ss;
-  ss << prefix;
-  bool first = true;
-  for (const PartitionStatusMap::value_type& val: stats) {
-    if (!first) ss << endl;
-    first = false;
-    ss << "Partition: ";
-
-    const string& partition_key = val.first;
-    if (partition_key == g_ImpalaInternalService_constants.ROOT_PARTITION_KEY) {
-      ss << "Default" << endl;
-    } else {
-      ss << partition_key << endl;
-    }
-    if (val.second.__isset.num_modified_rows) {
-      ss << "NumModifiedRows: " << val.second.num_modified_rows << endl;
-    }
-
-    if (!val.second.__isset.stats) continue;
-    const TInsertStats& stats = val.second.stats;
-    if (stats.__isset.kudu_stats) {
-      ss << "NumRowErrors: " << stats.kudu_stats.num_row_errors << endl;
-    }
-
-    ss << indent << "BytesWritten: "
-       << PrettyPrinter::Print(stats.bytes_written, TUnit::BYTES);
-    if (stats.__isset.parquet_stats) {
-      const TParquetInsertStats& parquet_stats = stats.parquet_stats;
-      ss << endl << indent << "Per Column Sizes:";
-      for (map<string, int64_t>::const_iterator i = parquet_stats.per_column_size.begin();
-           i != parquet_stats.per_column_size.end(); ++i) {
-        ss << endl << indent << indent << i->first << ": "
-           << PrettyPrinter::Print(i->second, TUnit::BYTES);
-      }
-    }
-  }
-  return ss.str();
-}
-
 Status DataSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
   DCHECK(parent_mem_tracker != nullptr);
   DCHECK(profile_ != nullptr);

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/exec/data-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.h b/be/src/exec/data-sink.h
index d2e80af..605b46d 100644
--- a/be/src/exec/data-sink.h
+++ b/be/src/exec/data-sink.h
@@ -85,15 +85,6 @@ class DataSink {
       const TPlanFragmentInstanceCtx& fragment_instance_ctx,
       const RowDescriptor* row_desc, RuntimeState* state, DataSink** sink);
 
-  /// Merges one update to the DML stats for a partition. dst_stats will have the
-  /// combined stats of src_stats and dst_stats after this method returns.
-  static void MergeDmlStats(const TInsertStats& src_stats,
-      TInsertStats* dst_stats);
-
-  /// Outputs the DML stats contained in the map of partition updates to a string
-  static std::string OutputDmlStats(const PartitionStatusMap& stats,
-      const std::string& prefix = "");
-
   MemTracker* mem_tracker() const { return mem_tracker_.get(); }
   RuntimeProfile* profile() const { return profile_; }
   const std::vector<ScalarExprEvaluator*>& output_expr_evals() const {

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/exec/hbase-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-table-sink.cc b/be/src/exec/hbase-table-sink.cc
index 9e591ac..567e8bd 100644
--- a/be/src/exec/hbase-table-sink.cc
+++ b/be/src/exec/hbase-table-sink.cc
@@ -59,12 +59,7 @@ Status HBaseTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_track
   RETURN_IF_ERROR(hbase_table_writer_->Init(state));
 
   // Add a 'root partition' status in which to collect insert statistics
-  TInsertPartitionStatus root_status;
-  root_status.__set_num_modified_rows(0L);
-  root_status.__set_stats(TInsertStats());
-  root_status.__set_id(-1L);
-  state->per_partition_status()->insert(make_pair(ROOT_PARTITION_KEY, root_status));
-
+  state->dml_exec_state()->AddPartition(ROOT_PARTITION_KEY, -1L, nullptr);
   return Status::OK();
 }
 
@@ -74,8 +69,8 @@ Status HBaseTableSink::Send(RuntimeState* state, RowBatch* batch) {
   RETURN_IF_ERROR(state->CheckQueryState());
   // Since everything is set up just forward everything to the writer.
   RETURN_IF_ERROR(hbase_table_writer_->AppendRows(batch));
-  (*state->per_partition_status())[ROOT_PARTITION_KEY].num_modified_rows +=
-      batch->num_rows();
+  state->dml_exec_state()->UpdatePartition(
+      ROOT_PARTITION_KEY, batch->num_rows(), nullptr);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/exec/hdfs-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index c12b711..ca08bac 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -403,8 +403,9 @@ Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state,
   COUNTER_ADD(files_created_counter_, 1);
 
   if (!ShouldSkipStaging(state, output_partition)) {
-    // Save the ultimate destination for this file (it will be moved by the coordinator)
-    (*state->hdfs_files_to_move())[output_partition->current_file_name] = final_location;
+    // Save the ultimate destination for this file (it will be moved by the coordinator).
+    state->dml_exec_state()->AddFileToMove(
+        output_partition->current_file_name, final_location);
   }
 
   ++output_partition->num_files;
@@ -573,21 +574,15 @@ inline Status HdfsTableSink::GetOutputPartition(RuntimeState* state, const Tuple
       return status;
     }
 
-    // Save the partition name so that the coordinator can create the partition directory
-    // structure if needed
-    DCHECK(state->per_partition_status()->find(partition->partition_name) ==
-        state->per_partition_status()->end());
-    TInsertPartitionStatus partition_status;
-    partition_status.__set_num_modified_rows(0L);
-    partition_status.__set_id(partition_descriptor->id());
-    partition_status.__set_stats(TInsertStats());
-    partition_status.__set_partition_base_dir(table_desc_->hdfs_base_dir());
-    state->per_partition_status()->insert(
-        make_pair(partition->partition_name, partition_status));
+    // Save the partition name so that the coordinator can create the partition
+    // directory structure if needed.
+    state->dml_exec_state()->AddPartition(
+        partition->partition_name, partition_descriptor->id(),
+        &table_desc_->hdfs_base_dir());
 
     if (!no_more_rows && !ShouldSkipStaging(state, partition.get())) {
-      // Indicate that temporary directory is to be deleted after execution
-      (*state->hdfs_files_to_move())[partition->tmp_hdfs_dir_name] = "";
+      // Indicate that temporary directory is to be deleted after execution.
+      state->dml_exec_state()->AddFileToMove(partition->tmp_hdfs_dir_name, "");
     }
 
     partition_keys_to_output_partitions_[key].first = std::move(partition);
@@ -643,17 +638,8 @@ Status HdfsTableSink::FinalizePartitionFile(RuntimeState* state,
   // OutputPartition writer could be nullptr if there is no row to output.
   if (partition->writer.get() != nullptr) {
     RETURN_IF_ERROR(partition->writer->Finalize());
-
-    // Track total number of appended rows per partition in runtime
-    // state. partition->num_rows counts number of rows appended is per-file.
-    PartitionStatusMap::iterator it =
-        state->per_partition_status()->find(partition->partition_name);
-
-    // Should have been created in GetOutputPartition() when the partition was
-    // initialised.
-    DCHECK(it != state->per_partition_status()->end());
-    it->second.num_modified_rows += partition->num_rows;
-    DataSink::MergeDmlStats(partition->writer->stats(), &it->second.stats);
+    state->dml_exec_state()->UpdatePartition(
+        partition->partition_name, partition->num_rows, &partition->writer->stats());
   }
 
   RETURN_IF_ERROR(ClosePartitionFile(state, partition));

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/exec/kudu-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index 05f1d06..67bb86e 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -61,9 +61,6 @@ using kudu::client::KuduError;
 
 namespace impala {
 
-const static string& ROOT_PARTITION_KEY =
-    g_ImpalaInternalService_constants.ROOT_PARTITION_KEY;
-
 // Send 7MB buffers to Kudu, matching a hard-coded size in Kudu (KUDU-1693).
 const static int INDIVIDUAL_BUFFER_SIZE = 7 * 1024 * 1024;
 
@@ -92,15 +89,7 @@ Status KuduTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracke
       << "TableDescriptor must be an instance KuduTableDescriptor.";
   table_desc_ = static_cast<const KuduTableDescriptor*>(table_desc);
 
-  // Add a 'root partition' status in which to collect write statistics
-  TInsertPartitionStatus root_status;
-  root_status.__set_num_modified_rows(0L);
-  root_status.__set_id(-1L);
-  TKuduDmlStats kudu_dml_stats;
-  kudu_dml_stats.__set_num_row_errors(0L);
-  root_status.__set_stats(TInsertStats());
-  root_status.stats.__set_kudu_stats(kudu_dml_stats);
-  state->per_partition_status()->insert(make_pair(ROOT_PARTITION_KEY, root_status));
+  state->dml_exec_state()->InitForKuduDml();
 
   // Add counters
   total_rows_ = ADD_COUNTER(profile(), "TotalNumRows", TUnit::UNIT);
@@ -338,12 +327,9 @@ Status KuduTableSink::FlushFinal(RuntimeState* state) {
     VLOG_RPC << "Ignoring Flush() error status: " << flush_status.ToString();
   }
   Status status = CheckForErrors(state);
-  TInsertPartitionStatus& insert_status =
-      (*state->per_partition_status())[ROOT_PARTITION_KEY];
-  insert_status.__set_num_modified_rows(
-      total_rows_->value() - num_row_errors_->value());
-  insert_status.stats.kudu_stats.__set_num_row_errors(num_row_errors_->value());
-  insert_status.__set_kudu_latest_observed_ts(client_->GetLatestObservedTimestamp());
+  state->dml_exec_state()->SetKuduDmlStats(
+      total_rows_->value() - num_row_errors_->value(), num_row_errors_->value(),
+      client_->GetLatestObservedTimestamp());
   return status;
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/exec/plan-root-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc
index 1cdc544..836a376 100644
--- a/be/src/exec/plan-root-sink.cc
+++ b/be/src/exec/plan-root-sink.cc
@@ -117,6 +117,7 @@ void PlanRootSink::Close(RuntimeState* state) {
   unique_lock<mutex> l(lock_);
   // No guarantee that FlushFinal() has been called, so need to mark sender_done_ here as
   // well.
+  // TODO: shouldn't this also set eos to true? do we need both eos and sender_done_?
   sender_done_ = true;
   consumer_cv_.NotifyAll();
   // Wait for consumer to be done, in case sender tries to tear-down this sink while the

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 0d4b61c..89cbbb9 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -37,6 +37,7 @@ add_library(Runtime
   data-stream-sender.cc
   debug-options.cc
   descriptors.cc
+  dml-exec-state.cc
   exec-env.cc
   fragment-instance-state.cc
   hbase-table.cc

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 914a3e4..e8db00e 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -43,10 +43,7 @@ Coordinator::BackendState::BackendState(
     const TUniqueId& query_id, int state_idx, TRuntimeFilterMode::type filter_mode)
   : query_id_(query_id),
     state_idx_(state_idx),
-    filter_mode_(filter_mode),
-    rpc_latency_(0),
-    rpc_sent_(false),
-    peak_consumption_(0L) {
+    filter_mode_(filter_mode) {
 }
 
 void Coordinator::BackendState::Init(
@@ -413,11 +410,7 @@ Coordinator::BackendState::InstanceStats::InstanceStats(
     const FInstanceExecParams& exec_params, FragmentStats* fragment_stats,
     ObjectPool* obj_pool)
   : exec_params_(exec_params),
-    profile_(nullptr),
-    done_(false),
-    profile_created_(false),
-    total_split_size_(0),
-    total_ranges_complete_(0) {
+    profile_(nullptr) {
   const string& profile_name = Substitute("Instance $0 (host=$1)",
       PrintId(exec_params.instance_id), lexical_cast<string>(exec_params.host));
   profile_ = RuntimeProfile::Create(obj_pool, profile_name);

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/coordinator-backend-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index 0973ca3..d2f122c 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -161,24 +161,24 @@ class Coordinator::BackendState {
     int64_t last_report_time_ms_ = 0;
 
     /// owned by coordinator object pool provided in the c'tor, created in Update()
-    RuntimeProfile* profile_;
+    RuntimeProfile* profile_ = nullptr;
 
     /// true if the final report has been received for the fragment instance.
     /// Used to handle duplicate done ReportExecStatus RPC messages. Used only
     /// in ApplyExecStatusReport()
-    bool done_;
+    bool done_ = false;
 
     /// true after the first call to profile->Update()
-    bool profile_created_;
+    bool profile_created_ = false;
 
     /// cumulative size of all splits; set in c'tor
-    int64_t total_split_size_;
+    int64_t total_split_size_ = 0;
 
     /// wall clock timer for this instance
     MonotonicStopWatch stopwatch_;
 
     /// total scan ranges complete across all scan nodes
-    int64_t total_ranges_complete_;
+    int64_t total_ranges_complete_ = 0;
 
     /// SCAN_RANGES_COMPLETE_COUNTERs in profile_
     std::vector<RuntimeProfile::Counter*> scan_ranges_complete_counters_;
@@ -215,7 +215,7 @@ class Coordinator::BackendState {
   boost::mutex lock_;
 
   // number of in-flight instances
-  int num_remaining_instances_;
+  int num_remaining_instances_ = 0;
 
   /// If the status indicates an error status, execution has either been aborted by the
   /// executing impalad (which then reported the error) or cancellation has been
@@ -235,15 +235,15 @@ class Coordinator::BackendState {
   ErrorLogMap error_log_;
 
   /// Time, in ms, that it took to execute the ExecRemoteFragment() RPC.
-  int64_t rpc_latency_;
+  int64_t rpc_latency_ = 0;
 
   /// If true, ExecPlanFragment() rpc has been sent - even if it was not determined to be
   /// successful.
-  bool rpc_sent_;
+  bool rpc_sent_ = false;
 
   /// peak memory used for this query (value of that node's query memtracker's
   /// peak_consumption()
-  int64_t peak_consumption_;
+  int64_t peak_consumption_ = 0;
 
   /// Set in ApplyExecStatusReport(). Uses MonotonicMillis().
   int64_t last_report_time_ms_ = 0;

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index a5d53b2..e6b3bca 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -63,10 +63,7 @@ using std::unique_ptr;
 DECLARE_int32(be_port);
 DECLARE_string(hostname);
 
-DEFINE_bool(insert_inherit_permissions, false, "If true, new directories created by "
-    "INSERTs will inherit the permissions of their parent directories");
-
-namespace impala {
+using namespace impala;
 
 // Maximum number of fragment instances that can publish each broadcast filter.
 static const int MAX_BROADCAST_FILTER_PRODUCERS = 3;
@@ -93,9 +90,6 @@ Status Coordinator::Exec() {
   const TQueryExecRequest& request = schedule_.request();
   DCHECK(request.plan_exec_info.size() > 0);
 
-  needs_finalization_ = request.__isset.finalize_params;
-  if (needs_finalization_) finalize_params_ = request.finalize_params;
-
   VLOG_QUERY << "Exec() query_id=" << schedule_.query_id()
              << " stmt=" << request.query_ctx.client_request.stmt;
   stmt_type_ = request.stmt_type;
@@ -489,291 +483,32 @@ Status Coordinator::UpdateStatus(const Status& status, const string& backend_hos
   return query_status_;
 }
 
-void Coordinator::PopulatePathPermissionCache(hdfsFS fs, const string& path_str,
-    PermissionCache* permissions_cache) {
-  // Find out if the path begins with a hdfs:// -style prefix, and remove it and the
-  // location (e.g. host:port) if so.
-  int scheme_end = path_str.find("://");
-  string stripped_str;
-  if (scheme_end != string::npos) {
-    // Skip past the subsequent location:port/ prefix.
-    stripped_str = path_str.substr(path_str.find("/", scheme_end + 3));
-  } else {
-    stripped_str = path_str;
-  }
-
-  // Get the list of path components, used to build all path prefixes.
-  vector<string> components;
-  split(components, stripped_str, is_any_of("/"));
-
-  // Build a set of all prefixes (including the complete string) of stripped_path. So
-  // /a/b/c/d leads to a vector of: /a, /a/b, /a/b/c, /a/b/c/d
-  vector<string> prefixes;
-  // Stores the current prefix
-  stringstream accumulator;
-  for (const string& component: components) {
-    if (component.empty()) continue;
-    accumulator << "/" << component;
-    prefixes.push_back(accumulator.str());
-  }
-
-  // Now for each prefix, stat() it to see if a) it exists and b) if so what its
-  // permissions are. When we meet a directory that doesn't exist, we record the fact that
-  // we need to create it, and the permissions of its parent dir to inherit.
-  //
-  // Every prefix is recorded in the PermissionCache so we don't do more than one stat()
-  // for each path. If we need to create the directory, we record it as the pair (true,
-  // perms) so that the caller can identify which directories need their permissions
-  // explicitly set.
-
-  // Set to the permission of the immediate parent (i.e. the permissions to inherit if the
-  // current dir doesn't exist).
-  short permissions = 0;
-  for (const string& path: prefixes) {
-    PermissionCache::const_iterator it = permissions_cache->find(path);
-    if (it == permissions_cache->end()) {
-      hdfsFileInfo* info = hdfsGetPathInfo(fs, path.c_str());
-      if (info != nullptr) {
-        // File exists, so fill the cache with its current permissions.
-        permissions_cache->insert(
-            make_pair(path, make_pair(false, info->mPermissions)));
-        permissions = info->mPermissions;
-        hdfsFreeFileInfo(info, 1);
-      } else {
-        // File doesn't exist, so we need to set its permissions to its immediate parent
-        // once it's been created.
-        permissions_cache->insert(make_pair(path, make_pair(true, permissions)));
-      }
-    } else {
-      permissions = it->second.second;
-    }
-  }
-}
-
-Status Coordinator::FinalizeSuccessfulInsert() {
-  PermissionCache permissions_cache;
-  HdfsFsCache::HdfsFsMap filesystem_connection_cache;
-  HdfsOperationSet partition_create_ops(&filesystem_connection_cache);
-
-  // INSERT finalization happens in the five following steps
-  // 1. If OVERWRITE, remove all the files in the target directory
-  // 2. Create all the necessary partition directories.
-  HdfsTableDescriptor* hdfs_table;
-  RETURN_IF_ERROR(DescriptorTbl::CreateHdfsTblDescriptor(query_ctx_.desc_tbl,
-      finalize_params_.table_id, obj_pool(), &hdfs_table));
-  DCHECK(hdfs_table != nullptr)
-      << "INSERT target table not known in descriptor table: "
-      << finalize_params_.table_id;
-
-  // Loop over all partitions that were updated by this insert, and create the set of
-  // filesystem operations required to create the correct partition structure on disk.
-  for (const PartitionStatusMap::value_type& partition: per_partition_status_) {
-    SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "Overwrite/PartitionCreationTimer",
-          "FinalizationTimer"));
-    // INSERT allows writes to tables that have partitions on multiple filesystems.
-    // So we need to open connections to different filesystems as necessary. We use a
-    // local connection cache and populate it with one connection per filesystem that the
-    // partitions are on.
-    hdfsFS partition_fs_connection;
-    RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
-      partition.second.partition_base_dir, &partition_fs_connection,
-          &filesystem_connection_cache));
-
-    // Look up the partition in the descriptor table.
-    stringstream part_path_ss;
-    if (partition.second.id == -1) {
-      // If this is a non-existant partition, use the default partition location of
-      // <base_dir>/part_key_1=val/part_key_2=val/...
-      part_path_ss << finalize_params_.hdfs_base_dir << "/" << partition.first;
-    } else {
-      HdfsPartitionDescriptor* part = hdfs_table->GetPartition(partition.second.id);
-      DCHECK(part != nullptr)
-          << "table_id=" << hdfs_table->id() << " partition_id=" << partition.second.id
-          << "\n" <<  PrintThrift(runtime_state()->instance_ctx());
-      part_path_ss << part->location();
-    }
-    const string& part_path = part_path_ss.str();
-    bool is_s3_path = IsS3APath(part_path.c_str());
-
-    // If this is an overwrite insert, we will need to delete any updated partitions
-    if (finalize_params_.is_overwrite) {
-      if (partition.first.empty()) {
-        // If the root directory is written to, then the table must not be partitioned
-        DCHECK(per_partition_status_.size() == 1);
-        // We need to be a little more careful, and only delete data files in the root
-        // because the tmp directories the sink(s) wrote are there also.
-        // So only delete files in the table directory - all files are treated as data
-        // files by Hive and Impala, but directories are ignored (and may legitimately
-        // be used to store permanent non-table data by other applications).
-        int num_files = 0;
-        // hfdsListDirectory() only sets errno if there is an error, but it doesn't set
-        // it to 0 if the call succeed. When there is no error, errno could be any
-        // value. So need to clear errno before calling it.
-        // Once HDFS-8407 is fixed, the errno reset won't be needed.
-        errno = 0;
-        hdfsFileInfo* existing_files =
-            hdfsListDirectory(partition_fs_connection, part_path.c_str(), &num_files);
-        if (existing_files == nullptr && errno == EAGAIN) {
-          errno = 0;
-          existing_files =
-              hdfsListDirectory(partition_fs_connection, part_path.c_str(), &num_files);
-        }
-        // hdfsListDirectory() returns nullptr not only when there is an error but also
-        // when the directory is empty(HDFS-8407). Need to check errno to make sure
-        // the call fails.
-        if (existing_files == nullptr && errno != 0) {
-          return Status(GetHdfsErrorMsg("Could not list directory: ", part_path));
-        }
-        for (int i = 0; i < num_files; ++i) {
-          const string filename = path(existing_files[i].mName).filename().string();
-          if (existing_files[i].mKind == kObjectKindFile && !IsHiddenFile(filename)) {
-            partition_create_ops.Add(DELETE, existing_files[i].mName);
-          }
-        }
-        hdfsFreeFileInfo(existing_files, num_files);
-      } else {
-        // This is a partition directory, not the root directory; we can delete
-        // recursively with abandon, after checking that it ever existed.
-        // TODO: There's a potential race here between checking for the directory
-        // and a third-party deleting it.
-        if (FLAGS_insert_inherit_permissions && !is_s3_path) {
-          // There is no directory structure in S3, so "inheriting" permissions is not
-          // possible.
-          // TODO: Try to mimic inheriting permissions for S3.
-          PopulatePathPermissionCache(
-              partition_fs_connection, part_path, &permissions_cache);
-        }
-        // S3 doesn't have a directory structure, so we technically wouldn't need to
-        // CREATE_DIR on S3. However, libhdfs always checks if a path exists before
-        // carrying out an operation on that path. So we still need to call CREATE_DIR
-        // before we access that path due to this limitation.
-        if (hdfsExists(partition_fs_connection, part_path.c_str()) != -1) {
-          partition_create_ops.Add(DELETE_THEN_CREATE, part_path);
-        } else {
-          // Otherwise just create the directory.
-          partition_create_ops.Add(CREATE_DIR, part_path);
-        }
-      }
-    } else if (!is_s3_path
-        || !query_ctx_.client_request.query_options.s3_skip_insert_staging) {
-      // If the S3_SKIP_INSERT_STAGING query option is set, then the partition directories
-      // would have already been created by the table sinks.
-      if (FLAGS_insert_inherit_permissions && !is_s3_path) {
-        PopulatePathPermissionCache(
-            partition_fs_connection, part_path, &permissions_cache);
-      }
-      if (hdfsExists(partition_fs_connection, part_path.c_str()) == -1) {
-        partition_create_ops.Add(CREATE_DIR, part_path);
-      }
-    }
-  }
-
-  // We're done with the HDFS descriptor - free up its resources.
-  hdfs_table->ReleaseResources();
-  hdfs_table = nullptr;
-
-  {
-    SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "Overwrite/PartitionCreationTimer",
-          "FinalizationTimer"));
-    if (!partition_create_ops.Execute(
-        ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
-      for (const HdfsOperationSet::Error& err: partition_create_ops.errors()) {
-        // It's ok to ignore errors creating the directories, since they may already
-        // exist. If there are permission errors, we'll run into them later.
-        if (err.first->op() != CREATE_DIR) {
-          return Status(Substitute(
-              "Error(s) deleting partition directories. First error (of $0) was: $1",
-              partition_create_ops.errors().size(), err.second));
-        }
-      }
-    }
-  }
-
-  // 3. Move all tmp files
-  HdfsOperationSet move_ops(&filesystem_connection_cache);
-  HdfsOperationSet dir_deletion_ops(&filesystem_connection_cache);
-
-  for (FileMoveMap::value_type& move: files_to_move_) {
-    // Empty destination means delete, so this is a directory. These get deleted in a
-    // separate pass to ensure that we have moved all the contents of the directory first.
-    if (move.second.empty()) {
-      VLOG_ROW << "Deleting file: " << move.first;
-      dir_deletion_ops.Add(DELETE, move.first);
-    } else {
-      VLOG_ROW << "Moving tmp file: " << move.first << " to " << move.second;
-      if (FilesystemsMatch(move.first.c_str(), move.second.c_str())) {
-        move_ops.Add(RENAME, move.first, move.second);
-      } else {
-        move_ops.Add(MOVE, move.first, move.second);
-      }
-    }
-  }
-
-  {
-    SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "FileMoveTimer", "FinalizationTimer"));
-    if (!move_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
-      stringstream ss;
-      ss << "Error(s) moving partition files. First error (of "
-         << move_ops.errors().size() << ") was: " << move_ops.errors()[0].second;
-      return Status(ss.str());
-    }
-  }
-
-  // 4. Delete temp directories
-  {
-    SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "FileDeletionTimer",
-         "FinalizationTimer"));
-    if (!dir_deletion_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
-      stringstream ss;
-      ss << "Error(s) deleting staging directories. First error (of "
-         << dir_deletion_ops.errors().size() << ") was: "
-         << dir_deletion_ops.errors()[0].second;
-      return Status(ss.str());
-    }
-  }
-
-  // 5. Optionally update the permissions of the created partition directories
-  // Do this last so that we don't make a dir unwritable before we write to it.
-  if (FLAGS_insert_inherit_permissions) {
-    HdfsOperationSet chmod_ops(&filesystem_connection_cache);
-    for (const PermissionCache::value_type& perm: permissions_cache) {
-      bool new_dir = perm.second.first;
-      if (new_dir) {
-        short permissions = perm.second.second;
-        VLOG_QUERY << "INSERT created new directory: " << perm.first
-                   << ", inherited permissions are: " << oct << permissions;
-        chmod_ops.Add(CHMOD, perm.first, permissions);
-      }
-    }
-    if (!chmod_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
-      stringstream ss;
-      ss << "Error(s) setting permissions on newly created partition directories. First"
-         << " error (of " << chmod_ops.errors().size() << ") was: "
-         << chmod_ops.errors()[0].second;
-      return Status(ss.str());
-    }
-  }
-
-  return Status::OK();
-}
-
-Status Coordinator::FinalizeQuery() {
+Status Coordinator::FinalizeHdfsInsert() {
   // All instances must have reported their final statuses before finalization, which is a
   // post-condition of Wait. If the query was not successful, still try to clean up the
   // staging directory.
   DCHECK(has_called_wait_);
-  DCHECK(needs_finalization_);
+  DCHECK(finalize_params() != nullptr);
 
   VLOG_QUERY << "Finalizing query: " << query_id();
   SCOPED_TIMER(finalization_timer_);
   Status return_status = GetStatus();
   if (return_status.ok()) {
-    return_status = FinalizeSuccessfulInsert();
+    HdfsTableDescriptor* hdfs_table;
+    RETURN_IF_ERROR(DescriptorTbl::CreateHdfsTblDescriptor(query_ctx().desc_tbl,
+            finalize_params()->table_id, obj_pool(), &hdfs_table));
+    DCHECK(hdfs_table != nullptr)
+        << "INSERT target table not known in descriptor table: "
+        << finalize_params()->table_id;
+    return_status = dml_exec_state_.FinalizeHdfsInsert(*finalize_params(),
+        query_ctx().client_request.query_options.s3_skip_insert_staging,
+        hdfs_table, query_profile_);
+    hdfs_table->ReleaseResources();
   }
 
   stringstream staging_dir;
-  DCHECK(finalize_params_.__isset.staging_dir);
-  staging_dir << finalize_params_.staging_dir << "/" << PrintId(query_id(),"_") << "/";
+  DCHECK(finalize_params()->__isset.staging_dir);
+  staging_dir << finalize_params()->staging_dir << "/" << PrintId(query_id(),"_") << "/";
 
   hdfsFS hdfs_conn;
   RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(staging_dir.str(), &hdfs_conn));
@@ -813,26 +548,26 @@ Status Coordinator::Wait() {
   }
 
   DCHECK_EQ(stmt_type_, TStmtType::DML);
-  // Query finalization can only happen when all backends have reported
-  // relevant state. They only have relevant state to report in the parallel
-  // INSERT case, otherwise all the relevant state is from the coordinator
-  // fragment which will be available after Open() returns.
-  // Ignore the returned status if finalization is required., since FinalizeQuery() will
-  // pick it up and needs to execute regardless.
+  // Query finalization can only happen when all backends have reported relevant
+  // state. They only have relevant state to report in the parallel INSERT case,
+  // otherwise all the relevant state is from the coordinator fragment which will be
+  // available after Open() returns.  Ignore the returned status if finalization is
+  // required., since FinalizeHdfsInsert() will pick it up and needs to execute
+  // regardless.
   Status status = WaitForBackendCompletion();
-  if (!needs_finalization_ && !status.ok()) return status;
+  if (finalize_params() == nullptr && !status.ok()) return status;
 
   // Execution of query fragments has finished. We don't need to hold onto query execution
   // resources while we finalize the query.
   ReleaseExecResources();
   // Query finalization is required only for HDFS table sinks
-  if (needs_finalization_) RETURN_IF_ERROR(FinalizeQuery());
+  if (finalize_params() != nullptr) RETURN_IF_ERROR(FinalizeHdfsInsert());
   // Release admission control resources after we'd done the potentially heavyweight
   // finalization.
   ReleaseAdmissionControlResources();
 
   query_profile_->AddInfoString(
-      "DML Stats", DataSink::OutputDmlStats(per_partition_status_, "\n"));
+      "DML Stats", dml_exec_state_.OutputPartitionStats("\n"));
   // For DML queries, when Wait is done, the query is complete.
   ComputeQuerySummary();
   return status;
@@ -929,7 +664,7 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& param
   // TODO: only do this when the sink is done; probably missing a done field
   // in TReportExecStatus for that
   if (params.__isset.insert_exec_status) {
-    UpdateInsertExecStatus(params.insert_exec_status);
+    dml_exec_state_.Update(params.insert_exec_status);
   }
 
   if (backend_state->ApplyExecStatusReport(params, &exec_summary_, &progress_)) {
@@ -971,52 +706,10 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& param
   return Status::OK();
 }
 
-void Coordinator::UpdateInsertExecStatus(const TInsertExecStatus& insert_exec_status) {
-  lock_guard<mutex> l(lock_);
-  for (const PartitionStatusMap::value_type& partition:
-       insert_exec_status.per_partition_status) {
-    TInsertPartitionStatus* status = &(per_partition_status_[partition.first]);
-    status->__set_num_modified_rows(
-        status->num_modified_rows + partition.second.num_modified_rows);
-    status->__set_kudu_latest_observed_ts(std::max(
-        partition.second.kudu_latest_observed_ts, status->kudu_latest_observed_ts));
-    status->__set_id(partition.second.id);
-    status->__set_partition_base_dir(partition.second.partition_base_dir);
-
-    if (partition.second.__isset.stats) {
-      if (!status->__isset.stats) status->__set_stats(TInsertStats());
-      DataSink::MergeDmlStats(partition.second.stats, &status->stats);
-    }
-  }
-  files_to_move_.insert(
-      insert_exec_status.files_to_move.begin(), insert_exec_status.files_to_move.end());
-}
-
-
-uint64_t Coordinator::GetLatestKuduInsertTimestamp() const {
-  uint64_t max_ts = 0;
-  for (const auto& entry : per_partition_status_) {
-    max_ts = std::max(max_ts,
-        static_cast<uint64_t>(entry.second.kudu_latest_observed_ts));
-  }
-  return max_ts;
-}
-
 RuntimeState* Coordinator::runtime_state() {
   return coord_instance_ == nullptr ? nullptr : coord_instance_->runtime_state();
 }
 
-bool Coordinator::PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update) {
-  // Assume we are called only after all fragments have completed
-  DCHECK(has_called_wait_);
-
-  for (const PartitionStatusMap::value_type& partition: per_partition_status_) {
-    catalog_update->created_partitions.insert(partition.first);
-  }
-
-  return catalog_update->created_partitions.size() != 0;
-}
-
 // TODO: add histogram/percentile
 void Coordinator::ComputeQuerySummary() {
   // In this case, the query did not even get to start all fragment instances.
@@ -1285,4 +978,8 @@ void Coordinator::FInstanceStatsToJson(Document* doc) {
   }
   doc->AddMember("backend_instances", states, doc->GetAllocator());
 }
+
+const TFinalizeParams* Coordinator::finalize_params() const {
+  return schedule_.request().__isset.finalize_params
+      ? &schedule_.request().finalize_params : nullptr;
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index d630b9a..6665c08 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -38,7 +38,8 @@
 #include "common/status.h"
 #include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/Types_types.h"
-#include "runtime/runtime-state.h" // for PartitionStatusMap; TODO: disentangle
+#include "runtime/dml-exec-state.h"
+#include "runtime/query-state.h"
 #include "scheduling/query-schedule.h"
 #include "util/condition-variable.h"
 #include "util/progress-updater.h"
@@ -130,9 +131,9 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// Idempotent.
   void Cancel(const Status* cause = nullptr);
 
-  /// Updates execution status of a particular backend as well as Insert-related
-  /// status (per_partition_status_ and files_to_move_). Also updates
-  /// num_remaining_backends_ and cancels execution if the backend has an error status.
+  /// Updates execution status of a particular backend as well as dml_exec_state_.
+  /// Also updates num_remaining_backends_ and cancels execution if the backend has an
+  /// error status.
   Status UpdateBackendExecStatus(const TReportExecStatusParams& params)
       WARN_UNUSED_RESULT;
 
@@ -149,17 +150,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   MemTracker* query_mem_tracker() const;
 
   /// This is safe to call only after Wait()
-  const PartitionStatusMap& per_partition_status() { return per_partition_status_; }
-
-  /// Returns the latest Kudu timestamp observed across any backends where DML into Kudu
-  /// was executed, or 0 if there were no Kudu timestamps reported.
-  /// This should only be called after Wait().
-  uint64_t GetLatestKuduInsertTimestamp() const;
-
-  /// Gathers all updates to the catalog required once this query has completed execution.
-  /// Returns true if a catalog update is required, false otherwise.
-  /// Must only be called after Wait()
-  bool PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update);
+  DmlExecState* dml_exec_state() { return &dml_exec_state_; }
 
   /// Return error log for coord and all the fragments. The error messages from the
   /// individual fragment instances are merged into a single output to retain readability.
@@ -229,12 +220,6 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// GetNext() hits eos.
   PlanRootSink* coord_sink_ = nullptr;
 
-  /// True if the query needs a post-execution step to tidy up
-  bool needs_finalization_ = false;
-
-  /// Only valid if needs_finalization is true
-  TFinalizeParams finalize_params_;
-
   /// ensures single-threaded execution of Wait(); must not hold lock_ when acquiring this
   boost::mutex wait_lock_;
 
@@ -275,6 +260,11 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
 
   ExecSummary exec_summary_;
 
+  /// Filled in as the query completes and tracks the results of DML queries.  This is
+  /// either the union of the reports from all fragment instances, or taken from the
+  /// coordinator fragment: only one of the two can legitimately produce updates.
+  DmlExecState dml_exec_state_;
+
   /// Aggregate counters for the entire query. Lives in 'obj_pool_'.
   RuntimeProfile* query_profile_ = nullptr;
 
@@ -308,21 +298,6 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// hits 0, any Wait()'ing thread is notified
   int num_remaining_backends_ = 0;
 
-  /// The following two structures, partition_row_counts_ and files_to_move_ are filled in
-  /// as the query completes, and track the results of INSERT queries that alter the
-  /// structure of tables. They are either the union of the reports from all fragment
-  /// instances, or taken from the coordinator fragment: only one of the two can
-  /// legitimately produce updates.
-
-  /// The set of partitions that have been written to or updated by all fragment
-  /// instances, along with statistics such as the number of rows written (may be 0). For
-  /// unpartitioned tables, the empty string denotes the entire table.
-  PartitionStatusMap per_partition_status_;
-
-  /// The set of files to move after an INSERT query has run, in (src, dest) form. An
-  /// empty string for the destination means that a file is to be deleted.
-  FileMoveMap files_to_move_;
-
   /// Event timeline for this query. Not owned.
   RuntimeProfile::EventSequence* query_events_ = nullptr;
 
@@ -357,6 +332,12 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// Returns a local object pool.
   ObjectPool* obj_pool() { return obj_pool_.get(); }
 
+  /// Returns request's finalize params, or nullptr if not present. If not present, then
+  /// HDFS INSERT finalization is not required.
+  const TFinalizeParams* finalize_params() const;
+
+  const TQueryCtx& query_ctx() const { return schedule_.request().query_ctx; }
+
   /// Only valid *after* calling Exec(). Return nullptr if the running query does not
   /// produce any rows.
   RuntimeState* runtime_state();
@@ -381,9 +362,6 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   Status UpdateStatus(const Status& status, const std::string& backend_hostname,
       bool is_fragment_failure, const TUniqueId& failed_fragment) WARN_UNUSED_RESULT;
 
-  /// Update per_partition_status_ and files_to_move_.
-  void UpdateInsertExecStatus(const TInsertExecStatus& insert_exec_status);
-
   /// Returns only when either all execution backends have reported success or the query
   /// is in error. Returns the status of the query.
   /// It is safe to call this concurrently, but any calls must be made only after Exec().
@@ -403,29 +381,11 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// profiles must not be updated while this is running.
   void ComputeQuerySummary();
 
-  /// TODO: move the next 3 functions into a separate class
-
-  /// Determines what the permissions of directories created by INSERT statements should
-  /// be if permission inheritance is enabled. Populates a map from all prefixes of
-  /// path_str (including the full path itself) which is a path in Hdfs, to pairs
-  /// (does_not_exist, permissions), where does_not_exist is true if the path does not
-  /// exist in Hdfs. If does_not_exist is true, permissions is set to the permissions of
-  /// the most immediate ancestor of the path that does exist, i.e. the permissions that
-  /// the path should inherit when created. Otherwise permissions is set to the actual
-  /// permissions of the path. The PermissionCache argument is also used to cache the
-  /// output across repeated calls, to avoid repeatedly calling hdfsGetPathInfo() on the
-  /// same path.
-  typedef boost::unordered_map<std::string, std::pair<bool, short>> PermissionCache;
-  void PopulatePathPermissionCache(hdfsFS fs, const std::string& path_str,
-      PermissionCache* permissions_cache);
-
-  /// Moves all temporary staging files to their final destinations.
-  Status FinalizeSuccessfulInsert() WARN_UNUSED_RESULT;
-
-  /// Perform any post-query cleanup required. Called by Wait() only after all fragment
-  /// instances have returned, or if the query has failed, in which case it only cleans up
-  /// temporary data rather than finishing the INSERT in flight.
-  Status FinalizeQuery() WARN_UNUSED_RESULT;
+  /// Perform any post-query cleanup required for HDFS (or other Hadoop FileSystem)
+  /// INSERT. Called by Wait() only after all fragment instances have returned, or if
+  /// the query has failed, in which case it only cleans up temporary data rather than
+  /// finishing the INSERT in flight.
+  Status FinalizeHdfsInsert() WARN_UNUSED_RESULT;
 
   /// Populates backend_states_, starts query execution at all backends in parallel, and
   /// blocks until startup completes.

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/dml-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/dml-exec-state.cc b/be/src/runtime/dml-exec-state.cc
new file mode 100644
index 0000000..6853da5
--- /dev/null
+++ b/be/src/runtime/dml-exec-state.cc
@@ -0,0 +1,494 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/dml-exec-state.h"
+
+#include <boost/thread/locks.hpp>
+#include <boost/thread/lock_guard.hpp>
+#include <boost/algorithm/string.hpp>
+#include <boost/algorithm/string/join.hpp>
+#include <boost/filesystem.hpp>
+#include <gutil/strings/substitute.h>
+
+#include "common/logging.h"
+#include "util/pretty-printer.h"
+#include "util/container-util.h"
+#include "util/hdfs-bulk-ops.h"
+#include "util/hdfs-util.h"
+#include "util/runtime-profile-counters.h"
+#include "runtime/descriptors.h"
+#include "runtime/hdfs-fs-cache.h"
+#include "runtime/exec-env.h"
+#include "gen-cpp/ImpalaService_types.h"
+#include "gen-cpp/ImpalaInternalService_constants.h"
+#include "gen-cpp/ImpalaInternalService_types.h"
+#include "gen-cpp/Frontend_types.h"
+
+#include "common/names.h"
+
+DEFINE_bool(insert_inherit_permissions, false, "If true, new directories created by "
+    "INSERTs will inherit the permissions of their parent directories");
+
+using namespace impala;
+using boost::algorithm::is_any_of;
+using boost::algorithm::split;
+
+string DmlExecState::OutputPartitionStats(const string& prefix) {
+  lock_guard<mutex> l(lock_);
+  const char* indent = "  ";
+  stringstream ss;
+  ss << prefix;
+  bool first = true;
+  for (const PartitionStatusMap::value_type& val: per_partition_status_) {
+    if (!first) ss << endl;
+    first = false;
+    ss << "Partition: ";
+    const string& partition_key = val.first;
+    if (partition_key == g_ImpalaInternalService_constants.ROOT_PARTITION_KEY) {
+      ss << "Default" << endl;
+    } else {
+      ss << partition_key << endl;
+    }
+    if (val.second.__isset.num_modified_rows) {
+      ss << "NumModifiedRows: " << val.second.num_modified_rows << endl;
+    }
+
+    if (!val.second.__isset.stats) continue;
+    const TInsertStats& stats = val.second.stats;
+    if (stats.__isset.kudu_stats) {
+      ss << "NumRowErrors: " << stats.kudu_stats.num_row_errors << endl;
+    }
+
+    ss << indent << "BytesWritten: "
+       << PrettyPrinter::Print(stats.bytes_written, TUnit::BYTES);
+    if (stats.__isset.parquet_stats) {
+      const TParquetInsertStats& parquet_stats = stats.parquet_stats;
+      ss << endl << indent << "Per Column Sizes:";
+      for (map<string, int64_t>::const_iterator i = parquet_stats.per_column_size.begin();
+           i != parquet_stats.per_column_size.end(); ++i) {
+        ss << endl << indent << indent << i->first << ": "
+           << PrettyPrinter::Print(i->second, TUnit::BYTES);
+      }
+    }
+  }
+  return ss.str();
+}
+
+void DmlExecState::Update(const TInsertExecStatus& dml_exec_status) {
+  lock_guard<mutex> l(lock_);
+  for (const PartitionStatusMap::value_type& partition:
+           dml_exec_status.per_partition_status) {
+    TInsertPartitionStatus* status = &(per_partition_status_[partition.first]);
+    status->__set_num_modified_rows(
+        status->num_modified_rows + partition.second.num_modified_rows);
+    status->__set_kudu_latest_observed_ts(max<uint64_t>(
+            partition.second.kudu_latest_observed_ts, status->kudu_latest_observed_ts));
+    status->__set_id(partition.second.id);
+    status->__set_partition_base_dir(partition.second.partition_base_dir);
+
+    if (partition.second.__isset.stats) {
+      if (!status->__isset.stats) status->__set_stats(TInsertStats());
+      MergeDmlStats(partition.second.stats, &status->stats);
+    }
+  }
+  files_to_move_.insert(
+      dml_exec_status.files_to_move.begin(), dml_exec_status.files_to_move.end());
+}
+
+void DmlExecState::AddFileToMove(const string& file_name, const string& location) {
+  lock_guard<mutex> l(lock_);
+  files_to_move_[file_name] = location;
+}
+
+uint64_t DmlExecState::GetKuduLatestObservedTimestamp() {
+  lock_guard<mutex> l(lock_);
+  uint64_t max_ts = 0;
+  for (const auto& entry : per_partition_status_) {
+    max_ts = max<uint64_t>(max_ts, entry.second.kudu_latest_observed_ts);
+  }
+  return max_ts;
+}
+
+int64_t DmlExecState::GetNumModifiedRows() {
+  lock_guard<mutex> l(lock_);
+  int64_t result = 0;
+  for (const PartitionStatusMap::value_type& p: per_partition_status_) {
+    result += p.second.num_modified_rows;
+  }
+  return result;
+}
+
+bool DmlExecState::PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update) {
+  lock_guard<mutex> l(lock_);
+  for (const PartitionStatusMap::value_type& partition: per_partition_status_) {
+    catalog_update->created_partitions.insert(partition.first);
+  }
+  return catalog_update->created_partitions.size() != 0;
+}
+
+Status DmlExecState::FinalizeHdfsInsert(const TFinalizeParams& params,
+    bool s3_skip_insert_staging, HdfsTableDescriptor* hdfs_table,
+    RuntimeProfile* profile) {
+  lock_guard<mutex> l(lock_);
+  PermissionCache permissions_cache;
+  HdfsFsCache::HdfsFsMap filesystem_connection_cache;
+  HdfsOperationSet partition_create_ops(&filesystem_connection_cache);
+
+  // INSERT finalization happens in the five following steps
+  // 1. If OVERWRITE, remove all the files in the target directory
+  // 2. Create all the necessary partition directories.
+
+  // Loop over all partitions that were updated by this insert, and create the set of
+  // filesystem operations required to create the correct partition structure on disk.
+  for (const PartitionStatusMap::value_type& partition: per_partition_status_) {
+    SCOPED_TIMER(ADD_CHILD_TIMER(profile, "Overwrite/PartitionCreationTimer",
+            "FinalizationTimer"));
+    // INSERT allows writes to tables that have partitions on multiple filesystems.
+    // So we need to open connections to different filesystems as necessary. We use a
+    // local connection cache and populate it with one connection per filesystem that the
+    // partitions are on.
+    hdfsFS partition_fs_connection;
+    RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
+            partition.second.partition_base_dir, &partition_fs_connection,
+            &filesystem_connection_cache));
+
+    // Look up the partition in the descriptor table.
+    stringstream part_path_ss;
+    if (partition.second.id == -1) {
+      // If this is a non-existant partition, use the default partition location of
+      // <base_dir>/part_key_1=val/part_key_2=val/...
+      part_path_ss << params.hdfs_base_dir << "/" << partition.first;
+    } else {
+      HdfsPartitionDescriptor* part = hdfs_table->GetPartition(partition.second.id);
+      DCHECK(part != nullptr)
+          << "table_id=" << hdfs_table->id() << " partition_id=" << partition.second.id;
+      part_path_ss << part->location();
+    }
+    const string& part_path = part_path_ss.str();
+    bool is_s3_path = IsS3APath(part_path.c_str());
+
+    // If this is an overwrite insert, we will need to delete any updated partitions
+    if (params.is_overwrite) {
+      if (partition.first.empty()) {
+        // If the root directory is written to, then the table must not be partitioned
+        DCHECK(per_partition_status_.size() == 1);
+        // We need to be a little more careful, and only delete data files in the root
+        // because the tmp directories the sink(s) wrote are there also.
+        // So only delete files in the table directory - all files are treated as data
+        // files by Hive and Impala, but directories are ignored (and may legitimately
+        // be used to store permanent non-table data by other applications).
+        int num_files = 0;
+        // hfdsListDirectory() only sets errno if there is an error, but it doesn't set
+        // it to 0 if the call succeed. When there is no error, errno could be any
+        // value. So need to clear errno before calling it.
+        // Once HDFS-8407 is fixed, the errno reset won't be needed.
+        errno = 0;
+        hdfsFileInfo* existing_files =
+            hdfsListDirectory(partition_fs_connection, part_path.c_str(), &num_files);
+        if (existing_files == nullptr && errno == EAGAIN) {
+          errno = 0;
+          existing_files =
+              hdfsListDirectory(partition_fs_connection, part_path.c_str(), &num_files);
+        }
+        // hdfsListDirectory() returns nullptr not only when there is an error but also
+        // when the directory is empty(HDFS-8407). Need to check errno to make sure
+        // the call fails.
+        if (existing_files == nullptr && errno != 0) {
+          return Status(GetHdfsErrorMsg("Could not list directory: ", part_path));
+        }
+        for (int i = 0; i < num_files; ++i) {
+          const string filename =
+              boost::filesystem::path(existing_files[i].mName).filename().string();
+          if (existing_files[i].mKind == kObjectKindFile && !IsHiddenFile(filename)) {
+            partition_create_ops.Add(DELETE, existing_files[i].mName);
+          }
+        }
+        hdfsFreeFileInfo(existing_files, num_files);
+      } else {
+        // This is a partition directory, not the root directory; we can delete
+        // recursively with abandon, after checking that it ever existed.
+        // TODO: There's a potential race here between checking for the directory
+        // and a third-party deleting it.
+        if (FLAGS_insert_inherit_permissions && !is_s3_path) {
+          // There is no directory structure in S3, so "inheriting" permissions is not
+          // possible.
+          // TODO: Try to mimic inheriting permissions for S3.
+          PopulatePathPermissionCache(
+              partition_fs_connection, part_path, &permissions_cache);
+        }
+        // S3 doesn't have a directory structure, so we technically wouldn't need to
+        // CREATE_DIR on S3. However, libhdfs always checks if a path exists before
+        // carrying out an operation on that path. So we still need to call CREATE_DIR
+        // before we access that path due to this limitation.
+        if (hdfsExists(partition_fs_connection, part_path.c_str()) != -1) {
+          partition_create_ops.Add(DELETE_THEN_CREATE, part_path);
+        } else {
+          // Otherwise just create the directory.
+          partition_create_ops.Add(CREATE_DIR, part_path);
+        }
+      }
+    } else if (!is_s3_path || !s3_skip_insert_staging) {
+      // If the S3_SKIP_INSERT_STAGING query option is set, then the partition directories
+      // would have already been created by the table sinks.
+      if (FLAGS_insert_inherit_permissions && !is_s3_path) {
+        PopulatePathPermissionCache(
+            partition_fs_connection, part_path, &permissions_cache);
+      }
+      if (hdfsExists(partition_fs_connection, part_path.c_str()) == -1) {
+        partition_create_ops.Add(CREATE_DIR, part_path);
+      }
+    }
+  }
+
+  {
+    SCOPED_TIMER(ADD_CHILD_TIMER(profile, "Overwrite/PartitionCreationTimer",
+            "FinalizationTimer"));
+    if (!partition_create_ops.Execute(
+            ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
+      for (const HdfsOperationSet::Error& err: partition_create_ops.errors()) {
+        // It's ok to ignore errors creating the directories, since they may already
+        // exist. If there are permission errors, we'll run into them later.
+        if (err.first->op() != CREATE_DIR) {
+          return Status(Substitute(
+                  "Error(s) deleting partition directories. First error (of $0) was: $1",
+                  partition_create_ops.errors().size(), err.second));
+        }
+      }
+    }
+  }
+
+  // 3. Move all tmp files
+  HdfsOperationSet move_ops(&filesystem_connection_cache);
+  HdfsOperationSet dir_deletion_ops(&filesystem_connection_cache);
+
+  for (FileMoveMap::value_type& move: files_to_move_) {
+    // Empty destination means delete, so this is a directory. These get deleted in a
+    // separate pass to ensure that we have moved all the contents of the directory first.
+    if (move.second.empty()) {
+      VLOG_ROW << "Deleting file: " << move.first;
+      dir_deletion_ops.Add(DELETE, move.first);
+    } else {
+      VLOG_ROW << "Moving tmp file: " << move.first << " to " << move.second;
+      if (FilesystemsMatch(move.first.c_str(), move.second.c_str())) {
+        move_ops.Add(RENAME, move.first, move.second);
+      } else {
+        move_ops.Add(MOVE, move.first, move.second);
+      }
+    }
+  }
+
+  {
+    SCOPED_TIMER(ADD_CHILD_TIMER(profile, "FileMoveTimer", "FinalizationTimer"));
+    if (!move_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
+      stringstream ss;
+      ss << "Error(s) moving partition files. First error (of "
+         << move_ops.errors().size() << ") was: " << move_ops.errors()[0].second;
+      return Status(ss.str());
+    }
+  }
+
+  // 4. Delete temp directories
+  {
+    SCOPED_TIMER(ADD_CHILD_TIMER(profile, "FileDeletionTimer", "FinalizationTimer"));
+    if (!dir_deletion_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
+      stringstream ss;
+      ss << "Error(s) deleting staging directories. First error (of "
+         << dir_deletion_ops.errors().size() << ") was: "
+         << dir_deletion_ops.errors()[0].second;
+      return Status(ss.str());
+    }
+  }
+
+  // 5. Optionally update the permissions of the created partition directories
+  // Do this last so that we don't make a dir unwritable before we write to it.
+  if (FLAGS_insert_inherit_permissions) {
+    HdfsOperationSet chmod_ops(&filesystem_connection_cache);
+    for (const PermissionCache::value_type& perm: permissions_cache) {
+      bool new_dir = perm.second.first;
+      if (new_dir) {
+        short permissions = perm.second.second;
+        VLOG_QUERY << "INSERT created new directory: " << perm.first
+                   << ", inherited permissions are: " << oct << permissions;
+        chmod_ops.Add(CHMOD, perm.first, permissions);
+      }
+    }
+    if (!chmod_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
+      stringstream ss;
+      ss << "Error(s) setting permissions on newly created partition directories. First"
+         << " error (of " << chmod_ops.errors().size() << ") was: "
+         << chmod_ops.errors()[0].second;
+      return Status(ss.str());
+    }
+  }
+  return Status::OK();
+}
+
+void DmlExecState::PopulatePathPermissionCache(hdfsFS fs, const string& path_str,
+    PermissionCache* permissions_cache) {
+  // Find out if the path begins with a hdfs:// -style prefix, and remove it and the
+  // location (e.g. host:port) if so.
+  int scheme_end = path_str.find("://");
+  string stripped_str;
+  if (scheme_end != string::npos) {
+    // Skip past the subsequent location:port/ prefix.
+    stripped_str = path_str.substr(path_str.find("/", scheme_end + 3));
+  } else {
+    stripped_str = path_str;
+  }
+
+  // Get the list of path components, used to build all path prefixes.
+  vector<string> components;
+  split(components, stripped_str, is_any_of("/"));
+
+  // Build a set of all prefixes (including the complete string) of stripped_path. So
+  // /a/b/c/d leads to a vector of: /a, /a/b, /a/b/c, /a/b/c/d
+  vector<string> prefixes;
+  // Stores the current prefix
+  stringstream accumulator;
+  for (const string& component: components) {
+    if (component.empty()) continue;
+    accumulator << "/" << component;
+    prefixes.push_back(accumulator.str());
+  }
+
+  // Now for each prefix, stat() it to see if a) it exists and b) if so what its
+  // permissions are. When we meet a directory that doesn't exist, we record the fact that
+  // we need to create it, and the permissions of its parent dir to inherit.
+  //
+  // Every prefix is recorded in the PermissionCache so we don't do more than one stat()
+  // for each path. If we need to create the directory, we record it as the pair (true,
+  // perms) so that the caller can identify which directories need their permissions
+  // explicitly set.
+
+  // Set to the permission of the immediate parent (i.e. the permissions to inherit if the
+  // current dir doesn't exist).
+  short permissions = 0;
+  for (const string& path: prefixes) {
+    PermissionCache::const_iterator it = permissions_cache->find(path);
+    if (it == permissions_cache->end()) {
+      hdfsFileInfo* info = hdfsGetPathInfo(fs, path.c_str());
+      if (info != nullptr) {
+        // File exists, so fill the cache with its current permissions.
+        permissions_cache->insert(
+            make_pair(path, make_pair(false, info->mPermissions)));
+        permissions = info->mPermissions;
+        hdfsFreeFileInfo(info, 1);
+      } else {
+        // File doesn't exist, so we need to set its permissions to its immediate parent
+        // once it's been created.
+        permissions_cache->insert(make_pair(path, make_pair(true, permissions)));
+      }
+    } else {
+      permissions = it->second.second;
+    }
+  }
+}
+
+bool DmlExecState::ToThrift(TInsertExecStatus* dml_status) {
+  lock_guard<mutex> l(lock_);
+  bool set_thrift = false;
+  if (files_to_move_.size() > 0) {
+    dml_status->__set_files_to_move(files_to_move_);
+    set_thrift = true;
+  }
+  if (per_partition_status_.size() > 0) {
+    dml_status->__set_per_partition_status(per_partition_status_);
+    set_thrift = true;
+  }
+  return set_thrift;
+}
+
+void DmlExecState::ToTInsertResult(TInsertResult* insert_result) {
+  lock_guard<mutex> l(lock_);
+  int64_t num_row_errors = 0;
+  bool has_kudu_stats = false;
+  for (const PartitionStatusMap::value_type& v: per_partition_status_) {
+    insert_result->rows_modified[v.first] = v.second.num_modified_rows;
+    if (v.second.__isset.stats && v.second.stats.__isset.kudu_stats) {
+      has_kudu_stats = true;
+    }
+    num_row_errors += v.second.stats.kudu_stats.num_row_errors;
+  }
+  if (has_kudu_stats) insert_result->__set_num_row_errors(num_row_errors);
+}
+
+void DmlExecState::AddPartition(
+    const string& name, int64_t id, const string* base_dir) {
+  lock_guard<mutex> l(lock_);
+  DCHECK(per_partition_status_.find(name) == per_partition_status_.end());
+  TInsertPartitionStatus status;
+  status.__set_num_modified_rows(0L);
+  status.__set_id(id);
+  status.__isset.stats = true;
+  if (base_dir != nullptr) status.__set_partition_base_dir(*base_dir);
+  per_partition_status_.insert(make_pair(name, status));
+}
+
+void DmlExecState::UpdatePartition(const string& partition_name,
+    int64_t num_modified_rows_delta, const TInsertStats* insert_stats) {
+  lock_guard<mutex> l(lock_);
+  PartitionStatusMap::iterator entry = per_partition_status_.find(partition_name);
+  DCHECK(entry != per_partition_status_.end());
+  entry->second.num_modified_rows += num_modified_rows_delta;
+  if (insert_stats == nullptr) return;
+  MergeDmlStats(*insert_stats, &entry->second.stats);
+}
+
+void DmlExecState::MergeDmlStats(const TInsertStats& src, TInsertStats* dst) {
+  dst->bytes_written += src.bytes_written;
+  if (src.__isset.kudu_stats) {
+    dst->__isset.kudu_stats = true;
+    if (!dst->kudu_stats.__isset.num_row_errors) {
+      dst->kudu_stats.__set_num_row_errors(0);
+    }
+    dst->kudu_stats.__set_num_row_errors(
+        dst->kudu_stats.num_row_errors + src.kudu_stats.num_row_errors);
+  }
+  if (src.__isset.parquet_stats) {
+    if (dst->__isset.parquet_stats) {
+      MergeMapValues<string, int64_t>(src.parquet_stats.per_column_size,
+          &dst->parquet_stats.per_column_size);
+    } else {
+      dst->__set_parquet_stats(src.parquet_stats);
+    }
+  }
+}
+
+void DmlExecState::InitForKuduDml() {
+  // For Kudu, track only one set of DML stats, so use the ROOT_PARTITION_KEY.
+  const string& partition_name = g_ImpalaInternalService_constants.ROOT_PARTITION_KEY;
+  lock_guard<mutex> l(lock_);
+  DCHECK(per_partition_status_.find(partition_name) == per_partition_status_.end());
+  TInsertPartitionStatus status;
+  status.__set_num_modified_rows(0L);
+  status.__set_id(-1L);
+  status.__isset.stats = true;
+  status.stats.__isset.kudu_stats = true;
+  per_partition_status_.insert(make_pair(partition_name, status));
+}
+
+void DmlExecState::SetKuduDmlStats(int64_t num_modified_rows, int64_t num_row_errors,
+    int64_t latest_ts) {
+  // For Kudu, track only one set of DML stats, so use the ROOT_PARTITION_KEY.
+  const string& partition_name = g_ImpalaInternalService_constants.ROOT_PARTITION_KEY;
+  lock_guard<mutex> l(lock_);
+  PartitionStatusMap::iterator entry = per_partition_status_.find(partition_name);
+  DCHECK(entry != per_partition_status_.end());
+  entry->second.__set_num_modified_rows(num_modified_rows);
+  entry->second.stats.kudu_stats.__set_num_row_errors(num_row_errors);
+  entry->second.__set_kudu_latest_observed_ts(latest_ts);
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/dml-exec-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/dml-exec-state.h b/be/src/runtime/dml-exec-state.h
new file mode 100644
index 0000000..728284a
--- /dev/null
+++ b/be/src/runtime/dml-exec-state.h
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef IMPALA_RUNTIME_DML_EXEC_STATE_H
+#define IMPALA_RUNTIME_DML_EXEC_STATE_H
+
+#include <string>
+#include <map>
+#include <boost/unordered_map.hpp>
+#include <boost/thread/mutex.hpp>
+
+#include "common/hdfs.h"
+#include "common/status.h"
+
+namespace impala {
+
+class TInsertExecStatus;
+class TInsertResult;
+class TInsertStats;
+class TFinalizeParams;
+class TUpdateCatalogRequest;
+class TInsertPartitionStatus;
+class RuntimeProfile;
+class HdfsTableDescriptor;
+
+/// DmlExecState manages the state related to the execution of a DML statement
+/// (creation of new files, new partitions, etc.).
+///
+/// During DML execution, the table sink adds per-partition status using AddPartition()
+/// and then UpdatePartition() for non-Kudu tables.  For Kudu tables, the sink adds DML
+/// stats using InitForKuduDml() followed by SetKuduDmlStats().  In the case of the
+/// HDFS sink, it will also record the collection of files that should be moved by the
+/// coordinator on finalization using AddFileToMove().
+///
+/// The state is then serialized to thrift and merged at the coordinator using
+/// Update().  The coordinator will then use OutputPartitionStats(),
+/// GetKuduLatestObservedTimestamp(), PrepareCatalogUpdate() and FinalizeHdfsInsert()
+/// to perform various finalization tasks.
+///
+
+/// Thread-safe.
+class DmlExecState {
+ public:
+  /// Merge values from 'dml_exec_status'.
+  void Update(const TInsertExecStatus& dml_exec_status);
+
+  /// Add a new partition with the given parameters. Ignores 'base_dir' if nullptr.
+  /// It is an error to call this for an existing partition.
+  void AddPartition(const std::string& name, int64_t id, const std::string* base_dir);
+
+  /// Merge given values into stats for partition with name 'partition_name'.
+  /// Ignores 'insert_stats' if nullptr.
+  /// Requires that the partition already exist.
+  void UpdatePartition(const std::string& partition_name,
+      int64_t num_modified_rows_delta, const TInsertStats* insert_stats);
+
+  /// Used to initialize this state when execute Kudu DML. Must be called before
+  /// SetKuduDmlStats().
+  void InitForKuduDml();
+
+  /// Update stats for a Kudu DML sink. Requires that InitForKuduDml() was already called.
+  void SetKuduDmlStats(int64_t num_modified_rows, int64_t num_row_errors,
+      int64_t latest_ts);
+
+  /// Adds new file/location to the move map.
+  void AddFileToMove(const std::string& file_name, const std::string& location);
+
+  /// Outputs the partition stats to a string.
+  std::string OutputPartitionStats(const std::string& prefix);
+
+  /// Returns the latest Kudu timestamp observed across any backends where DML into Kudu
+  /// was executed, or 0 if there were no Kudu timestamps reported.
+  uint64_t GetKuduLatestObservedTimestamp();
+
+  /// Return the total number of modified rows across all partitions.
+  int64_t GetNumModifiedRows();
+
+  /// Populates 'catalog_update' with PartitionStatusMap data.
+  /// Returns true if a catalog update is required, false otherwise.
+  bool PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update);
+
+  /// For HDFS (and other Hadoop FileSystem) INSERT, moves all temporary staging files
+  /// to their final destinations, as indicated by 'params', and creates new partitions
+  /// for 'hdfs_table' as required.  Adds child timers to profile for the various
+  /// stages of finalization.  If the table is on an S3 path and
+  /// 's3_skip_insert_staging' is true, does not create new partition directories.
+  Status FinalizeHdfsInsert(const TFinalizeParams& params, bool s3_skip_insert_staging,
+      HdfsTableDescriptor* hdfs_table, RuntimeProfile* profile) WARN_UNUSED_RESULT;
+
+  // Serialize to thrift. Returns true if any fields of 'dml_status' were set.
+  bool ToThrift(TInsertExecStatus* dml_status);
+
+  // Populates 'insert_result' with PartitionStatusMap data, for Impala's extension of
+  // Beeswax.
+  void ToTInsertResult(TInsertResult* insert_result);
+
+ private:
+  // protects all fields below
+  boost::mutex lock_;
+
+  /// Counts how many rows an DML query has added to a particular partition (partitions
+  /// are identified by their partition keys: k1=v1/k2=v2 etc. Unpartitioned tables
+  /// have a single 'default' partition which is identified by ROOT_PARTITION_KEY.
+  /// Uses ordered map so that iteration order is deterministic.
+  typedef std::map<std::string, TInsertPartitionStatus> PartitionStatusMap;
+  PartitionStatusMap per_partition_status_;
+
+  /// Tracks files to move from a temporary (key) to a final destination (value) as
+  /// part of query finalization. If the destination is empty, the file is to be
+  /// deleted.  Uses ordered map so that iteration order is deterministic.
+  typedef std::map<std::string, std::string> FileMoveMap;
+  FileMoveMap files_to_move_;
+
+  /// Determines what the permissions of directories created by INSERT statements should
+  /// be if permission inheritance is enabled. Populates a map from all prefixes of
+  /// 'path_str' (including the full path itself) which is a path in Hdfs, to pairs
+  /// (does_not_exist, permissions), where does_not_exist is true if the path does not
+  /// exist in Hdfs. If does_not_exist is true, permissions is set to the permissions of
+  /// the most immediate ancestor of the path that does exist, i.e. the permissions that
+  /// the path should inherit when created. Otherwise permissions is set to the actual
+  /// permissions of the path. The PermissionCache argument is also used to cache the
+  /// output across repeated calls, to avoid repeatedly calling hdfsGetPathInfo() on the
+  /// same path.
+  typedef boost::unordered_map<std::string, std::pair<bool, short>> PermissionCache;
+  void PopulatePathPermissionCache(hdfsFS fs, const std::string& path_str,
+      PermissionCache* permissions_cache);
+
+  /// Merge 'src' into 'dst'. Not thread-safe.
+  void MergeDmlStats(const TInsertStats& src, TInsertStats* dst);
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 1cc646d..ad5748f 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -236,18 +236,9 @@ void QueryState::ReportExecStatusAux(bool done, const Status& status,
     // Only send updates to insert status if fragment is finished, the coordinator waits
     // until query execution is done to use them anyhow.
     RuntimeState* state = fis->runtime_state();
-    if (done && (state->hdfs_files_to_move()->size() > 0
-        || state->per_partition_status()->size() > 0)) {
-      TInsertExecStatus insert_status;
-      if (state->hdfs_files_to_move()->size() > 0) {
-        insert_status.__set_files_to_move(*state->hdfs_files_to_move());
-      }
-      if (state->per_partition_status()->size() > 0) {
-        insert_status.__set_per_partition_status(*state->per_partition_status());
-      }
-      params.__set_insert_exec_status(insert_status);
+    if (done && state->dml_exec_state()->ToThrift(&params.insert_exec_status)) {
+      params.__isset.insert_exec_status = true;
     }
-
     // Send new errors to coordinator
     state->GetUnreportedErrors(&params.error_log);
     params.__isset.error_log = (params.error_log.size() > 0);

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/runtime-filter-bank.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc
index 239e066..4e23a42 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -27,6 +27,7 @@
 #include "runtime/mem-tracker.h"
 #include "runtime/query-state.h"
 #include "runtime/runtime-filter.inline.h"
+#include "runtime/runtime-state.h"
 #include "service/impala-server.h"
 #include "util/bit-util.h"
 #include "util/bloom-filter.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index cd2b061..4b005b2 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -27,6 +27,7 @@
 #include "common/global-types.h"  // for PlanNodeId
 #include "runtime/client-cache-types.h"
 #include "runtime/thread-resource-mgr.h"
+#include "runtime/dml-exec-state.h"
 #include "util/runtime-profile.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 
@@ -56,22 +57,6 @@ namespace io {
   class DiskIoMgr;
 }
 
-/// TODO: move the typedefs into a separate .h (and fix the includes for that)
-
-/// Counts how many rows an INSERT query has added to a particular partition
-/// (partitions are identified by their partition keys: k1=v1/k2=v2
-/// etc. Unpartitioned tables have a single 'default' partition which is
-/// identified by ROOT_PARTITION_KEY.
-typedef std::map<std::string, TInsertPartitionStatus> PartitionStatusMap;
-
-/// Stats per partition for insert queries. They key is the same as for PartitionRowCount
-typedef std::map<std::string, TInsertStats> PartitionInsertStats;
-
-/// Tracks files to move from a temporary (key) to a final destination (value) as
-/// part of query finalization. If the destination is empty, the file is to be
-/// deleted.
-typedef std::map<std::string, std::string> FileMoveMap;
-
 /// A collection of items that are part of the global state of a query and shared across
 /// all execution nodes of that query. After initialisation, callers must call
 /// ReleaseResources() to ensure that all resources are correctly freed before
@@ -133,8 +118,6 @@ class RuntimeState {
   }
   ThreadResourceMgr::ResourcePool* resource_pool() { return resource_pool_; }
 
-  FileMoveMap* hdfs_files_to_move() { return &hdfs_files_to_move_; }
-
   void set_fragment_root_id(PlanNodeId id) {
     DCHECK_EQ(root_node_id_, -1) << "Should not set this twice.";
     root_node_id_ = id;
@@ -146,7 +129,7 @@ class RuntimeState {
 
   RuntimeFilterBank* filter_bank() { return filter_bank_.get(); }
 
-  PartitionStatusMap* per_partition_status() { return &per_partition_status_; }
+  DmlExecState* dml_exec_state() { return &dml_exec_state_; }
 
   /// Returns runtime state profile
   RuntimeProfile* runtime_profile() { return profile_; }
@@ -341,12 +324,8 @@ class RuntimeState {
   /// state is responsible for returning this pool to the thread mgr.
   ThreadResourceMgr::ResourcePool* resource_pool_ = nullptr;
 
-  /// Temporary Hdfs files created, and where they should be moved to ultimately.
-  /// Mapping a filename to a blank destination causes it to be deleted.
-  FileMoveMap hdfs_files_to_move_;
-
-  /// Records summary statistics for the results of inserts into Hdfs partitions.
-  PartitionStatusMap per_partition_status_;
+  /// Execution state for DML statements.
+  DmlExecState dml_exec_state_;
 
   RuntimeProfile* const profile_;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index ac6178c..2aedcab 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -21,6 +21,7 @@
 #include <limits>
 #include <gutil/strings/substitute.h>
 
+#include "runtime/coordinator.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
@@ -570,7 +571,8 @@ void ClientRequestState::Done() {
   // must happen before taking lock_ below.
   if (coord_.get() != NULL) {
     // This is safe to access on coord_ after Wait() has been called.
-    uint64_t latest_kudu_ts = coord_->GetLatestKuduInsertTimestamp();
+    uint64_t latest_kudu_ts =
+        coord_->dml_exec_state()->GetKuduLatestObservedTimestamp();
     if (latest_kudu_ts > 0) {
       VLOG_RPC << "Updating session (id=" << session_id()  << ") with latest "
                << "observed Kudu timestamp: " << latest_kudu_ts;
@@ -918,7 +920,7 @@ Status ClientRequestState::UpdateCatalog() {
     catalog_update.__set_sync_ddl(exec_request().query_options.sync_ddl);
     catalog_update.__set_header(TCatalogServiceRequestHeader());
     catalog_update.header.__set_requesting_user(effective_user());
-    if (!coord()->PrepareCatalogUpdate(&catalog_update)) {
+    if (!coord()->dml_exec_state()->PrepareCatalogUpdate(&catalog_update)) {
       VLOG_QUERY << "No partitions altered, not updating metastore (query id: "
                  << query_id() << ")";
     } else {
@@ -1027,9 +1029,7 @@ void ClientRequestState::SetCreateTableAsSelectResultSet() {
   // operation.
   if (catalog_op_executor_->ddl_exec_response()->new_table_created) {
     DCHECK(coord_.get());
-    for (const PartitionStatusMap::value_type& p: coord_->per_partition_status()) {
-      total_num_rows_inserted += p.second.num_modified_rows;
-    }
+    total_num_rows_inserted = coord_->dml_exec_state()->GetNumModifiedRows();
   }
   const string& summary_msg = Substitute("Inserted $0 row(s)", total_num_rows_inserted);
   VLOG_QUERY << summary_msg;

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/service/client-request-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index 0c05bc4..657f3de 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -24,6 +24,7 @@
 #include "scheduling/query-schedule.h"
 #include "service/child-query.h"
 #include "service/impala-server.h"
+#include "service/query-result-set.h"
 #include "util/auth-util.h"
 #include "util/condition-variable.h"
 #include "util/runtime-profile.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/service/impala-beeswax-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index b827ff3..c441285 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -20,10 +20,12 @@
 #include "common/logging.h"
 #include "gen-cpp/Frontend_types.h"
 #include "rpc/thrift-util.h"
+#include "runtime/coordinator.h"
 #include "runtime/exec-env.h"
 #include "runtime/raw-value.inline.h"
 #include "runtime/timestamp-value.h"
 #include "service/client-request-state.h"
+#include "service/frontend.h"
 #include "service/query-options.h"
 #include "service/query-result-set.h"
 #include "util/auth-util.h"
@@ -563,22 +565,8 @@ Status ImpalaServer::CloseInsertInternal(const TUniqueId& query_id,
       // Note that when IMPALA-87 is fixed (INSERT without FROM clause) we might
       // need to revisit this, since that might lead us to insert a row without a
       // coordinator, depending on how we choose to drive the table sink.
-      int64_t num_row_errors = 0;
-      bool has_kudu_stats = false;
       if (request_state->coord() != nullptr) {
-        for (const PartitionStatusMap::value_type& v:
-             request_state->coord()->per_partition_status()) {
-          const pair<string, TInsertPartitionStatus> partition_status = v;
-          insert_result->rows_modified[partition_status.first] =
-              partition_status.second.num_modified_rows;
-
-          if (partition_status.second.__isset.stats &&
-              partition_status.second.stats.__isset.kudu_stats) {
-            has_kudu_stats = true;
-          }
-          num_row_errors += partition_status.second.stats.kudu_stats.num_row_errors;
-        }
-        if (has_kudu_stats) insert_result->__set_num_row_errors(num_row_errors);
+        request_state->coord()->dml_exec_state()->ToTInsertResult(insert_result);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/service/impala-hs2-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index 4381f81..80ace87 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -34,6 +34,7 @@
 #include "common/logging.h"
 #include "common/version.h"
 #include "rpc/thrift-util.h"
+#include "runtime/coordinator.h"
 #include "runtime/raw-value.h"
 #include "runtime/exec-env.h"
 #include "service/hs2-util.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/service/impala-http-handler.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc
index 43b03d1..3841bfe 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -24,6 +24,7 @@
 
 #include "catalog/catalog-util.h"
 #include "gen-cpp/beeswax_types.h"
+#include "runtime/coordinator.h"
 #include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/query-state.h"
@@ -31,6 +32,7 @@
 #include "runtime/timestamp-value.inline.h"
 #include "service/impala-server.h"
 #include "service/client-request-state.h"
+#include "service/frontend.h"
 #include "thrift/protocol/TDebugProtocol.h"
 #include "util/coding-util.h"
 #include "util/logging-support.h"