You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2018/03/29 23:30:45 UTC
[1/5] impala git commit: IMPALA-6731: Move execnet Python dependency
to stage 2
Repository: impala
Updated Branches:
refs/heads/master 8091b2f46 -> 1c4775d92
IMPALA-6731: Move execnet Python dependency to stage 2
It seems that execnet also cannot be installed together with
setuptools-scm if only a local mirror and index are available
(similar to https://github.com/pywebhdfs/pywebhdfs/issues/52).
Testing: Observed that execnet failed to install during
bootstrap_toolchain.py on a CentOS 6.4 EC2 instanc at 5:02pm (within the
brownout period). With this change, bootstrap_toolchain.py succeeded.
Change-Id: Ic949edcc03f0e068bdd84b6ede487e64dcf2439b
Reviewed-on: http://gerrit.cloudera.org:8080/9850
Reviewed-by: David Knupp <dk...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/2194dfd0
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/2194dfd0
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/2194dfd0
Branch: refs/heads/master
Commit: 2194dfd0ff63a51c1b17b1bbcd78895d0c2d6951
Parents: 8091b2f
Author: Lars Volker <lv...@cloudera.com>
Authored: Wed Mar 28 16:58:18 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Mar 29 04:38:31 2018 +0000
----------------------------------------------------------------------
infra/python/deps/requirements.txt | 3 ---
infra/python/deps/stage2-requirements.txt | 3 +++
2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/2194dfd0/infra/python/deps/requirements.txt
----------------------------------------------------------------------
diff --git a/infra/python/deps/requirements.txt b/infra/python/deps/requirements.txt
index bea16f4..d55bc19 100644
--- a/infra/python/deps/requirements.txt
+++ b/infra/python/deps/requirements.txt
@@ -39,9 +39,6 @@ Flask == 0.10.1
MarkupSafe == 0.23
Werkzeug == 0.11.3
itsdangerous == 0.24
-hdfs == 2.0.2
- docopt == 0.6.2
- execnet == 1.4.0
kazoo == 2.2.1
ordereddict == 1.1
pexpect == 3.3
http://git-wip-us.apache.org/repos/asf/impala/blob/2194dfd0/infra/python/deps/stage2-requirements.txt
----------------------------------------------------------------------
diff --git a/infra/python/deps/stage2-requirements.txt b/infra/python/deps/stage2-requirements.txt
index eda2cd3..c7c947c 100644
--- a/infra/python/deps/stage2-requirements.txt
+++ b/infra/python/deps/stage2-requirements.txt
@@ -27,6 +27,9 @@ pytest == 2.9.2
pytest-random == 0.02
pytest-runner == 4.2
pytest-xdist == 1.17.1
+hdfs == 2.0.2
+ docopt == 0.6.2
+ execnet == 1.4.0
# Requires pbr
pywebhdfs == 0.3.2
[3/5] impala git commit: IMPALA-5384, part 1: introduce DmlExecState
Posted by kw...@apache.org.
http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 6c62a19..5eeb52a 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -49,6 +49,7 @@
#include "rpc/thrift-thread.h"
#include "rpc/thrift-util.h"
#include "runtime/client-cache.h"
+#include "runtime/coordinator.h"
#include "runtime/data-stream-mgr.h"
#include "runtime/exec-env.h"
#include "runtime/lib-cache.h"
@@ -59,6 +60,7 @@
#include "service/impala-http-handler.h"
#include "service/impala-internal-service.h"
#include "service/client-request-state.h"
+#include "service/frontend.h"
#include "util/bit-util.h"
#include "util/container-util.h"
#include "util/debug-util.h"
http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 23d4687..2af89fd 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -34,7 +34,6 @@
#include "gen-cpp/Frontend_types.h"
#include "rpc/thrift-server.h"
#include "common/status.h"
-#include "service/frontend.h"
#include "service/query-options.h"
#include "util/condition-variable.h"
#include "util/metrics.h"
@@ -43,8 +42,6 @@
#include "util/simple-logger.h"
#include "util/thread-pool.h"
#include "util/time.h"
-#include "runtime/coordinator.h"
-#include "runtime/runtime-state.h"
#include "runtime/timestamp-value.h"
#include "runtime/types.h"
#include "statestore/statestore-subscriber.h"
http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/testutil/in-process-servers.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/in-process-servers.cc b/be/src/testutil/in-process-servers.cc
index 7ff44a8..19afcd5 100644
--- a/be/src/testutil/in-process-servers.cc
+++ b/be/src/testutil/in-process-servers.cc
@@ -29,6 +29,7 @@
#include "util/metrics.h"
#include "util/openssl-util.h"
#include "runtime/exec-env.h"
+#include "service/frontend.h"
#include "service/impala-server.h"
#include "common/names.h"
[5/5] impala git commit: IMPALA-6760: Fix for py2.7-ism in
run-tests.py.
Posted by kw...@apache.org.
IMPALA-6760: Fix for py2.7-ism in run-tests.py.
A set-literal snuck into run-tests.py in a recent
change. We wish to avoid these to be able to run on
py2.6.
Change-Id: I81928d1880a493b91abb13b3a8149568c9789f66
Reviewed-on: http://gerrit.cloudera.org:8080/9843
Reviewed-by: Philip Zeyliger <ph...@cloudera.com>
Tested-by: Philip Zeyliger <ph...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/1c4775d9
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/1c4775d9
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/1c4775d9
Branch: refs/heads/master
Commit: 1c4775d92abab73f01cc967967aebeacc4ead5de
Parents: 408ee4d
Author: Philip Zeyliger <ph...@cloudera.com>
Authored: Wed Mar 28 15:15:42 2018 -0700
Committer: Philip Zeyliger <ph...@cloudera.com>
Committed: Thu Mar 29 15:35:19 2018 +0000
----------------------------------------------------------------------
tests/run-tests.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/1c4775d9/tests/run-tests.py
----------------------------------------------------------------------
diff --git a/tests/run-tests.py b/tests/run-tests.py
index 6967b17..9552d0d 100755
--- a/tests/run-tests.py
+++ b/tests/run-tests.py
@@ -80,7 +80,7 @@ class TestCounterPlugin(object):
# https://docs.pytest.org/en/2.9.2/writing_plugins.html#_pytest.hookspec.pytest_collection_modifyitems
def pytest_collection_modifyitems(self, items):
for item in items:
- self.tests_collected.update({item.nodeid})
+ self.tests_collected.add(item.nodeid)
# link to pytest_runtest_logreport
# https://docs.pytest.org/en/2.9.2/_modules/_pytest/hookspec.html#pytest_runtest_logreport
[2/5] impala git commit: Revert "IMPALA-6389: Make '\0' delimited
text files work"
Posted by kw...@apache.org.
Revert "IMPALA-6389: Make '\0' delimited text files work"
This reverts commit c2bdaf8af4cf35d3462595c2a341ed84dcf5d960.
An ASAN issue and potentially other problem have been found;
reverting to unbreak the build and tests.
Change-Id: If581311033de8c26e33316b19192c4579594f261
Reviewed-on: http://gerrit.cloudera.org:8080/9851
Reviewed-by: Lars Volker <lv...@cloudera.com>
Tested-by: Zach Amsden <za...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/b78daedf
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/b78daedf
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/b78daedf
Branch: refs/heads/master
Commit: b78daedf52bdaf5028bae90f157d37b6c5bea2c6
Parents: 2194dfd
Author: Zach Amsden <za...@cloudera.com>
Authored: Thu Mar 29 03:51:41 2018 +0000
Committer: Zach Amsden <za...@cloudera.com>
Committed: Thu Mar 29 04:59:48 2018 +0000
----------------------------------------------------------------------
be/src/exec/delimited-text-parser-test.cc | 56 ++++---------------
be/src/exec/delimited-text-parser.cc | 74 ++++++++-----------------
be/src/exec/delimited-text-parser.h | 43 +++++---------
be/src/exec/delimited-text-parser.inline.h | 70 +++++++++++------------
be/src/exec/hdfs-sequence-scanner.cc | 2 +-
be/src/exec/hdfs-sequence-scanner.h | 3 +-
be/src/exec/hdfs-text-scanner.cc | 2 +-
be/src/exec/hdfs-text-scanner.h | 3 +-
8 files changed, 84 insertions(+), 169 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/b78daedf/be/src/exec/delimited-text-parser-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/delimited-text-parser-test.cc b/be/src/exec/delimited-text-parser-test.cc
index d8e977d..3156b36 100644
--- a/be/src/exec/delimited-text-parser-test.cc
+++ b/be/src/exec/delimited-text-parser-test.cc
@@ -24,7 +24,7 @@
namespace impala {
-void Validate(TupleDelimitedTextParser* parser, const string& data,
+void Validate(DelimitedTextParser* parser, const string& data,
int expected_offset, char tuple_delim, int expected_num_tuples,
int expected_num_fields) {
parser->ParserReset();
@@ -72,8 +72,8 @@ TEST(DelimitedTextParser, Basic) {
bool is_materialized_col[NUM_COLS];
for (int i = 0; i < NUM_COLS; ++i) is_materialized_col[i] = true;
- TupleDelimitedTextParser no_escape_parser(NUM_COLS, 0, is_materialized_col,
- TUPLE_DELIM, FIELD_DELIM, COLLECTION_DELIM);
+ DelimitedTextParser no_escape_parser(NUM_COLS, 0, is_materialized_col,
+ TUPLE_DELIM, FIELD_DELIM, COLLECTION_DELIM);
// Note that only complete tuples "count"
Validate(&no_escape_parser, "no_delims", -1, TUPLE_DELIM, 0, 0);
Validate(&no_escape_parser, "abc||abc", 4, TUPLE_DELIM, 1, 1);
@@ -81,9 +81,9 @@ TEST(DelimitedTextParser, Basic) {
Validate(&no_escape_parser, "a|bcd", 2, TUPLE_DELIM, 0, 0);
// Test with escape char
- TupleDelimitedTextParser escape_parser(NUM_COLS, 0, is_materialized_col,
- TUPLE_DELIM, FIELD_DELIM, COLLECTION_DELIM,
- ESCAPE_CHAR);
+ DelimitedTextParser escape_parser(NUM_COLS, 0, is_materialized_col,
+ TUPLE_DELIM, FIELD_DELIM, COLLECTION_DELIM,
+ ESCAPE_CHAR);
Validate(&escape_parser, "a@|a|bcd", 5, TUPLE_DELIM, 0, 0);
Validate(&escape_parser, "a@@|a|bcd", 4, TUPLE_DELIM, 1, 1);
Validate(&escape_parser, "a@@@|a|bcd", 7, TUPLE_DELIM, 0, 0);
@@ -127,8 +127,8 @@ TEST(DelimitedTextParser, Fields) {
bool is_materialized_col[NUM_COLS];
for (int i = 0; i < NUM_COLS; ++i) is_materialized_col[i] = true;
- TupleDelimitedTextParser no_escape_parser(NUM_COLS, 0, is_materialized_col,
- TUPLE_DELIM, FIELD_DELIM, COLLECTION_DELIM);
+ DelimitedTextParser no_escape_parser(NUM_COLS, 0, is_materialized_col,
+ TUPLE_DELIM, FIELD_DELIM, COLLECTION_DELIM);
Validate(&no_escape_parser, "a,b|c,d|e,f", 4, TUPLE_DELIM, 1, 3);
Validate(&no_escape_parser, "b|c,d|e,f", 2, TUPLE_DELIM, 1, 3);
@@ -137,9 +137,9 @@ TEST(DelimitedTextParser, Fields) {
const string str10("a,\0|c,d|e", 9);
Validate(&no_escape_parser, str10, 4, TUPLE_DELIM, 1, 2);
- TupleDelimitedTextParser escape_parser(NUM_COLS, 0, is_materialized_col,
- TUPLE_DELIM, FIELD_DELIM, COLLECTION_DELIM,
- ESCAPE_CHAR);
+ DelimitedTextParser escape_parser(NUM_COLS, 0, is_materialized_col,
+ TUPLE_DELIM, FIELD_DELIM, COLLECTION_DELIM,
+ ESCAPE_CHAR);
Validate(&escape_parser, "a,b|c,d|e,f", 4, TUPLE_DELIM, 1, 3);
Validate(&escape_parser, "a,@|c|e,f", 6, TUPLE_DELIM, 0, 1);
@@ -148,20 +148,14 @@ TEST(DelimitedTextParser, Fields) {
TEST(DelimitedTextParser, SpecialDelimiters) {
const char TUPLE_DELIM = '\n'; // implies '\r' and "\r\n" are also delimiters
- const char NUL_DELIM = '\0';
const int NUM_COLS = 1;
bool is_materialized_col[NUM_COLS];
for (int i = 0; i < NUM_COLS; ++i) is_materialized_col[i] = true;
- TupleDelimitedTextParser tuple_delim_parser(NUM_COLS, 0, is_materialized_col,
+ DelimitedTextParser tuple_delim_parser(NUM_COLS, 0, is_materialized_col,
TUPLE_DELIM);
- TupleDelimitedTextParser nul_delim_parser(NUM_COLS, 0, is_materialized_col, NUL_DELIM);
-
- TupleDelimitedTextParser nul_field_parser(2, 0, is_materialized_col,
- TUPLE_DELIM, NUL_DELIM);
-
// Non-SSE case
Validate(&tuple_delim_parser, "A\r\nB", 3, TUPLE_DELIM, 0, 0);
Validate(&tuple_delim_parser, "A\rB", 2, TUPLE_DELIM, 0, 0);
@@ -171,16 +165,6 @@ TEST(DelimitedTextParser, SpecialDelimiters) {
Validate(&tuple_delim_parser, "A\rB\nC\r\nD", 2, TUPLE_DELIM, 2, 2);
Validate(&tuple_delim_parser, "\r\r\n\n", 1, TUPLE_DELIM, 2, 2);
- // NUL tuple delimiter; no field delimiter
- const string nul1("\0\0\0", 3);
- const string nul2("AAA\0BBB\0", 8);
- const string nul3("\n\0\r\0\r\n\0", 7);
- const string nul4("\n\0\r\0\r\n", 6);
- Validate(&nul_delim_parser, nul1, 1, NUL_DELIM, 2, 2);
- Validate(&nul_delim_parser, nul2, 4, NUL_DELIM, 1, 1);
- Validate(&nul_delim_parser, nul3, 2, NUL_DELIM, 2, 2);
- Validate(&nul_delim_parser, nul4, 2, NUL_DELIM, 1, 1);
-
// SSE case
string data = "\rAAAAAAAAAAAAAAA";
DCHECK_EQ(data.size(), SSEUtil::CHARS_PER_128_BIT_REGISTER);
@@ -194,22 +178,6 @@ TEST(DelimitedTextParser, SpecialDelimiters) {
data = "\r\nAAA\n\r\r\nAAAAAAA";
DCHECK_EQ(data.size(), SSEUtil::CHARS_PER_128_BIT_REGISTER);
Validate(&tuple_delim_parser, data, 2, TUPLE_DELIM, 3, 3);
-
- // NUL SSE case
- const string nulsse1("AAAAA\0AAAAAAAAAAA\0AAAAAAAAAAAA\0\0", 32);
- const string nulsse2("AAAAA\0AAAAAAAAAAA\0AAAAAAAAAAAA\0A", 32);
- const string nulsse3("AAA\0BBBbbbbbbbbbbbbbbbbbbb\0cccc,ddd\0", 36);
- const string nulsse4("AAA\0BBBbbbbbbbbbbbbbbbbbbb\0cccc,dddd", 36);
- Validate(&nul_delim_parser, nulsse1, 6, NUL_DELIM, 3, 3);
- Validate(&nul_delim_parser, nulsse2, 6, NUL_DELIM, 2, 2);
- Validate(&nul_delim_parser, nulsse3, 4, NUL_DELIM, 2, 2);
- Validate(&nul_delim_parser, nulsse4, 4, NUL_DELIM, 1, 1);
-
- // NUL Field delimiters
- const string field1("\na\0b\0c\n", 7);
- const string field2("aaaa\na\0b\0c\naaaaa\0b\na\0b\0c\n", 25);
- Validate(&nul_field_parser, field1, 1, TUPLE_DELIM, 1, 2);
- Validate(&nul_field_parser, field2, 5, TUPLE_DELIM, 3, 6);
}
// TODO: expand test for other delimited text parser functions/cases.
http://git-wip-us.apache.org/repos/asf/impala/blob/b78daedf/be/src/exec/delimited-text-parser.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/delimited-text-parser.cc b/be/src/exec/delimited-text-parser.cc
index 7db65fd..18fcde1 100644
--- a/be/src/exec/delimited-text-parser.cc
+++ b/be/src/exec/delimited-text-parser.cc
@@ -24,8 +24,7 @@
using namespace impala;
-template<bool DELIMITED_TUPLES>
-DelimitedTextParser<DELIMITED_TUPLES>::DelimitedTextParser(
+DelimitedTextParser::DelimitedTextParser(
int num_cols, int num_partition_keys, const bool* is_materialized_col,
char tuple_delim, char field_delim, char collection_item_delim, char escape_char)
: is_materialized_col_(is_materialized_col),
@@ -73,7 +72,7 @@ DelimitedTextParser<DELIMITED_TUPLES>::DelimitedTextParser(
memset(low_mask_, 0, sizeof(low_mask_));
}
- if (DELIMITED_TUPLES) {
+ if (tuple_delim != '\0') {
search_chars[num_delims_++] = tuple_delim_;
++num_tuple_delims_;
// Hive will treats \r (^M) as an alternate tuple delimiter, but \r\n is a
@@ -83,12 +82,12 @@ DelimitedTextParser<DELIMITED_TUPLES>::DelimitedTextParser(
++num_tuple_delims_;
}
xmm_tuple_search_ = _mm_loadu_si128(reinterpret_cast<__m128i*>(search_chars));
- if (field_delim_ != tuple_delim_) search_chars[num_delims_++] = field_delim_;
- } else {
- search_chars[num_delims_++] = field_delim_;
}
- if (collection_item_delim != '\0') search_chars[num_delims_++] = collection_item_delim_;
+ if (field_delim != '\0' || collection_item_delim != '\0') {
+ search_chars[num_delims_++] = field_delim_;
+ search_chars[num_delims_++] = collection_item_delim_;
+ }
DCHECK_GT(num_delims_, 0);
xmm_delim_search_ = _mm_loadu_si128(reinterpret_cast<__m128i*>(search_chars));
@@ -96,30 +95,16 @@ DelimitedTextParser<DELIMITED_TUPLES>::DelimitedTextParser(
ParserReset();
}
-template
-DelimitedTextParser<true>::DelimitedTextParser(
- int num_cols, int num_partition_keys, const bool* is_materialized_col,
- char tuple_delim, char field_delim, char collection_item_delim, char escape_char);
-
-template
-DelimitedTextParser<false>::DelimitedTextParser(
- int num_cols, int num_partition_keys, const bool* is_materialized_col,
- char tuple_delim, char field_delim, char collection_item_delim, char escape_char);
-
-template<bool DELIMITED_TUPLES>
-void DelimitedTextParser<DELIMITED_TUPLES>::ParserReset() {
+void DelimitedTextParser::ParserReset() {
current_column_has_escape_ = false;
last_char_is_escape_ = false;
last_row_delim_offset_ = -1;
column_idx_ = num_partition_keys_;
}
-template void DelimitedTextParser<true>::ParserReset();
-
// Parsing raw csv data into FieldLocation descriptors.
-template<bool DELIMITED_TUPLES>
-Status DelimitedTextParser<DELIMITED_TUPLES>::ParseFieldLocations(int max_tuples,
- int64_t remaining_len, char** byte_buffer_ptr, char** row_end_locations,
+Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remaining_len,
+ char** byte_buffer_ptr, char** row_end_locations,
FieldLocation* field_locations,
int* num_tuples, int* num_fields, char** next_column_start) {
// Start of this batch.
@@ -148,10 +133,10 @@ Status DelimitedTextParser<DELIMITED_TUPLES>::ParseFieldLocations(int max_tuples
while (remaining_len > 0) {
bool new_tuple = false;
bool new_col = false;
- if (DELIMITED_TUPLES) unfinished_tuple_ = true;
+ unfinished_tuple_ = true;
if (!last_char_is_escape_) {
- if (DELIMITED_TUPLES && (**byte_buffer_ptr == tuple_delim_ ||
+ if (tuple_delim_ != '\0' && (**byte_buffer_ptr == tuple_delim_ ||
(tuple_delim_ == '\n' && **byte_buffer_ptr == '\r'))) {
new_tuple = true;
new_col = true;
@@ -181,7 +166,6 @@ Status DelimitedTextParser<DELIMITED_TUPLES>::ParseFieldLocations(int max_tuples
row_end_locations[*num_tuples] = *byte_buffer_ptr;
++(*num_tuples);
}
- DCHECK(DELIMITED_TUPLES);
unfinished_tuple_ = false;
last_row_delim_offset_ = **byte_buffer_ptr == '\r' ? remaining_len - 1 : -1;
if (*num_tuples == max_tuples) {
@@ -201,7 +185,7 @@ Status DelimitedTextParser<DELIMITED_TUPLES>::ParseFieldLocations(int max_tuples
// For formats that store the length of the row, the row is not delimited:
// e.g. Sequence files.
- if (!DELIMITED_TUPLES) {
+ if (tuple_delim_ == '\0') {
DCHECK_EQ(remaining_len, 0);
RETURN_IF_ERROR(AddColumn<true>(*byte_buffer_ptr - *next_column_start,
next_column_start, num_fields, field_locations));
@@ -209,30 +193,18 @@ Status DelimitedTextParser<DELIMITED_TUPLES>::ParseFieldLocations(int max_tuples
DCHECK(status.ok());
column_idx_ = num_partition_keys_;
++(*num_tuples);
+ unfinished_tuple_ = false;
}
return Status::OK();
}
-template
-Status DelimitedTextParser<true>::ParseFieldLocations(int max_tuples,
- int64_t remaining_len, char** byte_buffer_ptr, char** row_end_locations,
- FieldLocation* field_locations,
- int* num_tuples, int* num_fields, char** next_column_start);
-
-template
-Status DelimitedTextParser<false>::ParseFieldLocations(int max_tuples,
- int64_t remaining_len, char** byte_buffer_ptr, char** row_end_locations,
- FieldLocation* field_locations,
- int* num_tuples, int* num_fields, char** next_column_start);
-
-template<bool DELIMITED_TUPLES>
-int64_t DelimitedTextParser<DELIMITED_TUPLES>::FindFirstInstance(const char* buffer,
- int64_t len) {
+// Find the first instance of the tuple delimiter. This will find the start of the first
+// full tuple in buffer by looking for the end of the previous tuple.
+int64_t DelimitedTextParser::FindFirstInstance(const char* buffer, int64_t len) {
int64_t tuple_start = 0;
const char* buffer_start = buffer;
bool found = false;
- DCHECK(DELIMITED_TUPLES);
// If the last char in the previous buffer was \r then either return the start of
// this buffer or skip a \n at the beginning of the buffer.
if (last_row_delim_offset_ != -1) {
@@ -254,10 +226,13 @@ restart:
int tuple_mask = _mm_extract_epi16(xmm_tuple_mask, 0);
if (tuple_mask != 0) {
found = true;
- // Find first set bit (1-based)
- int i = ffs(tuple_mask);
- tuple_start += i;
- buffer += i;
+ for (int i = 0; i < SSEUtil::CHARS_PER_128_BIT_REGISTER; ++i) {
+ if ((tuple_mask & SSEUtil::SSE_BITMASK[i]) != 0) {
+ tuple_start += i + 1;
+ buffer += i + 1;
+ break;
+ }
+ }
break;
}
tuple_start += SSEUtil::CHARS_PER_128_BIT_REGISTER;
@@ -320,6 +295,3 @@ restart:
}
return tuple_start;
}
-
-template
-int64_t DelimitedTextParser<true>::FindFirstInstance(const char* buffer, int64_t len);
http://git-wip-us.apache.org/repos/asf/impala/blob/b78daedf/be/src/exec/delimited-text-parser.h
----------------------------------------------------------------------
diff --git a/be/src/exec/delimited-text-parser.h b/be/src/exec/delimited-text-parser.h
index 9b89127..b966081 100644
--- a/be/src/exec/delimited-text-parser.h
+++ b/be/src/exec/delimited-text-parser.h
@@ -25,27 +25,22 @@
namespace impala {
-template <bool DELIMITED_TUPLES>
class DelimitedTextParser {
public:
/// The Delimited Text Parser parses text rows that are delimited by specific
/// characters:
- /// tuple_delim: delimits tuples. Only used if DELIMITED_TUPLES is true.
+ /// tuple_delim: delimits tuples
/// field_delim: delimits fields
/// collection_item_delim: delimits collection items
/// escape_char: escape delimiters, make them part of the data.
- ///
- /// If the template parameter DELIMITED_TUPLES is false there is no support
- /// for tuple delimiters and we do not need to search for them. Any value
- /// may be passed for tuple_delim, as it is ignored.
- ///
+ //
/// 'num_cols' is the total number of columns including partition keys.
- ///
+ //
/// 'is_materialized_col' should be initialized to an array of length 'num_cols', with
/// is_materialized_col[i] = <true if column i should be materialized, false otherwise>
/// Owned by caller.
- ///
+ //
/// The main method is ParseData which fills in a vector of pointers and lengths to the
/// fields. It also can handle an escape character which masks a tuple or field
/// delimiter that occurs in the data.
@@ -96,14 +91,14 @@ class DelimitedTextParser {
/// This function is used to parse sequence file records which do not need to
/// parse for tuple delimiters. Returns an error status if any column exceeds the
/// size limit. See AddColumn() for details.
- /// This function is disabled for non-sequence file parsing.
- template <bool PROCESS_ESCAPES>
+ template <bool process_escapes>
Status ParseSingleTuple(int64_t len, char* buffer, FieldLocation* field_locations,
int* num_fields);
/// FindFirstInstance returns the position after the first non-escaped tuple
/// delimiter from the starting offset.
/// Used to find the start of a tuple if jumping into the middle of a text file.
+ /// Also used to find the sync marker for Sequenced and RC files.
/// If no tuple delimiter is found within the buffer, return -1;
int64_t FindFirstInstance(const char* buffer, int64_t len);
@@ -124,16 +119,13 @@ class DelimitedTextParser {
/// by the number fields added.
/// 'field_locations' will be updated with the start and length of the fields.
/// Returns an error status if 'len' exceeds the size limit specified in AddColumn().
- template <bool PROCESS_ESCAPES>
+ template <bool process_escapes>
Status FillColumns(int64_t len, char** last_column, int* num_fields,
impala::FieldLocation* field_locations);
/// Return true if we have not seen a tuple delimiter for the current tuple being
/// parsed (i.e., the last byte read was not a tuple delimiter).
- bool HasUnfinishedTuple() {
- DCHECK(DELIMITED_TUPLES);
- return unfinished_tuple_;
- }
+ bool HasUnfinishedTuple() { return unfinished_tuple_; }
private:
/// Initialize the parser state.
@@ -141,7 +133,7 @@ class DelimitedTextParser {
/// Helper routine to add a column to the field_locations vector.
/// Template parameter:
- /// PROCESS_ESCAPES -- if true the the column may have escape characters
+ /// process_escapes -- if true the the column may have escape characters
/// and the negative of the len will be stored.
/// len: length of the current column. The length of a column must fit in a 32-bit
/// signed integer (i.e. <= 2147483647 bytes). If a column is larger than that,
@@ -152,29 +144,23 @@ class DelimitedTextParser {
/// Output:
/// field_locations: updated with start and length of current field.
/// Return an error status if 'len' exceeds the size limit specified above.
- template <bool PROCESS_ESCAPES>
+ template <bool process_escapes>
Status AddColumn(int64_t len, char** next_column_start, int* num_fields,
FieldLocation* field_locations);
/// Helper routine to parse delimited text using SSE instructions.
/// Identical arguments as ParseFieldLocations.
- /// If the template argument, 'PROCESS_ESCAPES' is true, this function will handle
+ /// If the template argument, 'process_escapes' is true, this function will handle
/// escapes, otherwise, it will assume the text is unescaped. By using templates,
/// we can special case the un-escaped path for better performance. The unescaped
/// path is optimized away by the compiler. Returns an error status if the length
/// of any column exceeds the size limit. See AddColumn() for details.
- template <bool PROCESS_ESCAPES>
+ template <bool process_escapes>
Status ParseSse(int max_tuples, int64_t* remaining_len,
char** byte_buffer_ptr, char** row_end_locations_,
FieldLocation* field_locations,
int* num_tuples, int* num_fields, char** next_column_start);
- bool IsFieldOrCollectionItemDelimiter(char c) {
- return (!DELIMITED_TUPLES && c == field_delim_) ||
- (DELIMITED_TUPLES && field_delim_ != tuple_delim_ && c == field_delim_) ||
- (collection_item_delim_ != '\0' && c == collection_item_delim_);
- }
-
/// SSE(xmm) register containing the tuple search character(s).
__m128i xmm_tuple_search_;
@@ -228,7 +214,7 @@ class DelimitedTextParser {
/// Character delimiting collection items (to become slots).
char collection_item_delim_;
- /// Character delimiting tuples. Only used if DELIMITED_TUPLES is true.
+ /// Character delimiting tuples.
char tuple_delim_;
/// Whether or not the current column has an escape character in it
@@ -242,8 +228,5 @@ class DelimitedTextParser {
bool unfinished_tuple_;
};
-using TupleDelimitedTextParser = DelimitedTextParser<true>;
-using SequenceDelimitedTextParser = DelimitedTextParser<false>;
-
}// namespace impala
#endif// IMPALA_EXEC_DELIMITED_TEXT_PARSER_H
http://git-wip-us.apache.org/repos/asf/impala/blob/b78daedf/be/src/exec/delimited-text-parser.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/delimited-text-parser.inline.h b/be/src/exec/delimited-text-parser.inline.h
index 9fe737e..02fa132 100644
--- a/be/src/exec/delimited-text-parser.inline.h
+++ b/be/src/exec/delimited-text-parser.inline.h
@@ -52,10 +52,9 @@ inline void ProcessEscapeMask(uint16_t escape_mask, bool* last_char_is_escape,
*delim_mask &= ~escape_mask;
}
-template <bool DELIMITED_TUPLES>
-template <bool PROCESS_ESCAPES>
-inline Status DelimitedTextParser<DELIMITED_TUPLES>::AddColumn(int64_t len,
- char** next_column_start, int* num_fields, FieldLocation* field_locations) {
+template <bool process_escapes>
+inline Status DelimitedTextParser::AddColumn(int64_t len, char** next_column_start,
+ int* num_fields, FieldLocation* field_locations) {
if (UNLIKELY(!BitUtil::IsNonNegative32Bit(len))) {
return Status(TErrorCode::TEXT_PARSER_TRUNCATED_COLUMN, len);
}
@@ -63,27 +62,26 @@ inline Status DelimitedTextParser<DELIMITED_TUPLES>::AddColumn(int64_t len,
// Found a column that needs to be parsed, write the start/len to 'field_locations'
field_locations[*num_fields].start = *next_column_start;
int64_t field_len = len;
- if (PROCESS_ESCAPES && current_column_has_escape_) {
+ if (process_escapes && current_column_has_escape_) {
field_len = -len;
}
field_locations[*num_fields].len = static_cast<int32_t>(field_len);
++(*num_fields);
}
- if (PROCESS_ESCAPES) current_column_has_escape_ = false;
+ if (process_escapes) current_column_has_escape_ = false;
*next_column_start += len + 1;
++column_idx_;
return Status::OK();
}
-template <bool DELIMITED_TUPLES>
-template <bool PROCESS_ESCAPES>
-inline Status DelimitedTextParser<DELIMITED_TUPLES>::FillColumns(int64_t len,
- char** last_column, int* num_fields, FieldLocation* field_locations) {
+template <bool process_escapes>
+inline Status DelimitedTextParser::FillColumns(int64_t len, char** last_column,
+ int* num_fields, FieldLocation* field_locations) {
// Fill in any columns missing from the end of the tuple.
char* dummy = NULL;
if (last_column == NULL) last_column = &dummy;
while (column_idx_ < num_cols_) {
- RETURN_IF_ERROR(AddColumn<PROCESS_ESCAPES>(len, last_column,
+ RETURN_IF_ERROR(AddColumn<process_escapes>(len, last_column,
num_fields, field_locations));
// The rest of the columns will be null.
last_column = &dummy;
@@ -105,9 +103,8 @@ inline Status DelimitedTextParser<DELIMITED_TUPLES>::FillColumns(int64_t len,
/// Needle = 'abcd000000000000' (we're searching for any a's, b's, c's or d's)
/// Haystack = 'asdfghjklhjbdwwc' (the raw string)
/// Result = '1010000000011001'
-template <bool DELIMITED_TUPLES>
-template <bool PROCESS_ESCAPES>
-inline Status DelimitedTextParser<DELIMITED_TUPLES>::ParseSse(int max_tuples,
+template <bool process_escapes>
+inline Status DelimitedTextParser::ParseSse(int max_tuples,
int64_t* remaining_len, char** byte_buffer_ptr,
char** row_end_locations, FieldLocation* field_locations,
int* num_tuples, int* num_fields, char** next_column_start) {
@@ -149,7 +146,7 @@ inline Status DelimitedTextParser<DELIMITED_TUPLES>::ParseSse(int max_tuples,
uint16_t escape_mask = 0;
// If the table does not use escape characters, skip processing for it.
- if (PROCESS_ESCAPES) {
+ if (process_escapes) {
DCHECK(escape_char_ != '\0');
xmm_escape_mask = SSE4_cmpestrm<SSEUtil::STRCHR_MODE>(xmm_escape_search_, 1,
xmm_buffer, SSEUtil::CHARS_PER_128_BIT_REGISTER);
@@ -159,10 +156,8 @@ inline Status DelimitedTextParser<DELIMITED_TUPLES>::ParseSse(int max_tuples,
char* last_char = *byte_buffer_ptr + 15;
bool last_char_is_unescaped_delim = delim_mask >> 15;
- if (DELIMITED_TUPLES) {
- unfinished_tuple_ = !(last_char_is_unescaped_delim &&
- (*last_char == tuple_delim_ || (tuple_delim_ == '\n' && *last_char == '\r')));
- }
+ unfinished_tuple_ = !(last_char_is_unescaped_delim &&
+ (*last_char == tuple_delim_ || (tuple_delim_ == '\n' && *last_char == '\r')));
int last_col_idx = 0;
// Process all non-zero bits in the delim_mask from lsb->msb. If a bit
@@ -175,7 +170,7 @@ inline Status DelimitedTextParser<DELIMITED_TUPLES>::ParseSse(int max_tuples,
// clear current bit
delim_mask &= ~(SSEUtil::SSE_BITMASK[n]);
- if (PROCESS_ESCAPES) {
+ if (process_escapes) {
// Determine if there was an escape character between [last_col_idx, n]
bool escaped = (escape_mask & low_mask_[last_col_idx] & high_mask_[n]) != 0;
current_column_has_escape_ |= escaped;
@@ -184,14 +179,13 @@ inline Status DelimitedTextParser<DELIMITED_TUPLES>::ParseSse(int max_tuples,
char* delim_ptr = *byte_buffer_ptr + n;
- if (IsFieldOrCollectionItemDelimiter(*delim_ptr)) {
- RETURN_IF_ERROR(AddColumn<PROCESS_ESCAPES>(delim_ptr - *next_column_start,
+ if (*delim_ptr == field_delim_ || *delim_ptr == collection_item_delim_) {
+ RETURN_IF_ERROR(AddColumn<process_escapes>(delim_ptr - *next_column_start,
next_column_start, num_fields, field_locations));
continue;
}
- if (DELIMITED_TUPLES &&
- (*delim_ptr == tuple_delim_ || (tuple_delim_ == '\n' && *delim_ptr == '\r'))) {
+ if (*delim_ptr == tuple_delim_ || (tuple_delim_ == '\n' && *delim_ptr == '\r')) {
if (UNLIKELY(
last_row_delim_offset_ == *remaining_len - n && *delim_ptr == '\n')) {
// If the row ended in \r\n then move the next start past the \n
@@ -199,7 +193,7 @@ inline Status DelimitedTextParser<DELIMITED_TUPLES>::ParseSse(int max_tuples,
last_row_delim_offset_ = -1;
continue;
}
- RETURN_IF_ERROR(AddColumn<PROCESS_ESCAPES>(delim_ptr - *next_column_start,
+ RETURN_IF_ERROR(AddColumn<process_escapes>(delim_ptr - *next_column_start,
next_column_start, num_fields, field_locations));
Status status = FillColumns<false>(0, NULL, num_fields, field_locations);
DCHECK(status.ok());
@@ -210,7 +204,7 @@ inline Status DelimitedTextParser<DELIMITED_TUPLES>::ParseSse(int max_tuples,
last_row_delim_offset_ = *delim_ptr == '\r' ? *remaining_len - n - 1 : -1;
if (UNLIKELY(*num_tuples == max_tuples)) {
(*byte_buffer_ptr) += (n + 1);
- if (PROCESS_ESCAPES) last_char_is_escape_ = false;
+ if (process_escapes) last_char_is_escape_ = false;
*remaining_len -= (n + 1);
// If the last character we processed was \r then set the offset to 0
// so that we will use it at the beginning of the next batch.
@@ -220,7 +214,7 @@ inline Status DelimitedTextParser<DELIMITED_TUPLES>::ParseSse(int max_tuples,
}
}
- if (PROCESS_ESCAPES) {
+ if (process_escapes) {
// Determine if there was an escape character between (last_col_idx, 15)
bool unprocessed_escape = escape_mask & low_mask_[last_col_idx] & high_mask_[15];
current_column_has_escape_ |= unprocessed_escape;
@@ -233,10 +227,9 @@ inline Status DelimitedTextParser<DELIMITED_TUPLES>::ParseSse(int max_tuples,
}
/// Simplified version of ParseSSE which does not handle tuple delimiters.
-template<>
-template <bool PROCESS_ESCAPES>
-inline Status DelimitedTextParser<false>::ParseSingleTuple(int64_t remaining_len,
- char* buffer, FieldLocation* field_locations, int* num_fields) {
+template <bool process_escapes>
+inline Status DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char* buffer,
+ FieldLocation* field_locations, int* num_fields) {
char* next_column_start = buffer;
__m128i xmm_buffer, xmm_delim_mask, xmm_escape_mask;
@@ -253,7 +246,7 @@ inline Status DelimitedTextParser<false>::ParseSingleTuple(int64_t remaining_len
uint16_t escape_mask = 0;
// If the table does not use escape characters, skip processing for it.
- if (PROCESS_ESCAPES) {
+ if (process_escapes) {
DCHECK(escape_char_ != '\0');
xmm_escape_mask = SSE4_cmpestrm<SSEUtil::STRCHR_MODE>(xmm_escape_search_, 1,
xmm_buffer, SSEUtil::CHARS_PER_128_BIT_REGISTER);
@@ -270,7 +263,7 @@ inline Status DelimitedTextParser<false>::ParseSingleTuple(int64_t remaining_len
DCHECK_GE(n, 0);
DCHECK_LT(n, 16);
- if (PROCESS_ESCAPES) {
+ if (process_escapes) {
// Determine if there was an escape character between [last_col_idx, n]
bool escaped = (escape_mask & low_mask_[last_col_idx] & high_mask_[n]) != 0;
current_column_has_escape_ |= escaped;
@@ -280,11 +273,11 @@ inline Status DelimitedTextParser<false>::ParseSingleTuple(int64_t remaining_len
// clear current bit
delim_mask &= ~(SSEUtil::SSE_BITMASK[n]);
- RETURN_IF_ERROR(AddColumn<PROCESS_ESCAPES>(buffer + n - next_column_start,
+ RETURN_IF_ERROR(AddColumn<process_escapes>(buffer + n - next_column_start,
&next_column_start, num_fields, field_locations));
}
- if (PROCESS_ESCAPES) {
+ if (process_escapes) {
// Determine if there was an escape character between (last_col_idx, 15)
bool unprocessed_escape = escape_mask & low_mask_[last_col_idx] & high_mask_[15];
current_column_has_escape_ |= unprocessed_escape;
@@ -303,8 +296,9 @@ inline Status DelimitedTextParser<false>::ParseSingleTuple(int64_t remaining_len
last_char_is_escape_ = false;
}
- if (!last_char_is_escape_ && IsFieldOrCollectionItemDelimiter(*buffer)) {
- RETURN_IF_ERROR(AddColumn<PROCESS_ESCAPES>(buffer - next_column_start,
+ if (!last_char_is_escape_ &&
+ (*buffer == field_delim_ || *buffer == collection_item_delim_)) {
+ RETURN_IF_ERROR(AddColumn<process_escapes>(buffer - next_column_start,
&next_column_start, num_fields, field_locations));
}
@@ -314,7 +308,7 @@ inline Status DelimitedTextParser<false>::ParseSingleTuple(int64_t remaining_len
// Last column does not have a delimiter after it. Add that column and also
// pad with empty cols if the input is ragged.
- return FillColumns<PROCESS_ESCAPES>(buffer - next_column_start,
+ return FillColumns<process_escapes>(buffer - next_column_start,
&next_column_start, num_fields, field_locations);
}
http://git-wip-us.apache.org/repos/asf/impala/blob/b78daedf/be/src/exec/hdfs-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc
index 8a9151e..346a18a 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -73,7 +73,7 @@ Status HdfsSequenceScanner::InitNewRange() {
text_converter_.reset(new TextConverter(hdfs_partition->escape_char(),
scan_node_->hdfs_table()->null_column_value()));
- delimited_text_parser_.reset(new SequenceDelimitedTextParser(
+ delimited_text_parser_.reset(new DelimitedTextParser(
scan_node_->hdfs_table()->num_cols(), scan_node_->num_partition_keys(),
scan_node_->is_materialized_col(), '\0', hdfs_partition->field_delim(),
hdfs_partition->collection_delim(), hdfs_partition->escape_char()));
http://git-wip-us.apache.org/repos/asf/impala/blob/b78daedf/be/src/exec/hdfs-sequence-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.h b/be/src/exec/hdfs-sequence-scanner.h
index 463ffc7..4845edb 100644
--- a/be/src/exec/hdfs-sequence-scanner.h
+++ b/be/src/exec/hdfs-sequence-scanner.h
@@ -153,7 +153,6 @@
namespace impala {
-template <bool>
class DelimitedTextParser;
class HdfsSequenceScanner : public BaseSequenceScanner {
@@ -223,7 +222,7 @@ class HdfsSequenceScanner : public BaseSequenceScanner {
Status GetRecord(uint8_t** record_ptr, int64_t* record_len) WARN_UNUSED_RESULT;
/// Helper class for picking fields and rows from delimited text.
- boost::scoped_ptr<DelimitedTextParser<false>> delimited_text_parser_;
+ boost::scoped_ptr<DelimitedTextParser> delimited_text_parser_;
std::vector<FieldLocation> field_locations_;
/// Data that is fixed across headers. This struct is shared between scan ranges.
http://git-wip-us.apache.org/repos/asf/impala/blob/b78daedf/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index b78115d..253bcc8 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -203,7 +203,7 @@ Status HdfsTextScanner::InitNewRange() {
collection_delim = '\0';
}
- delimited_text_parser_.reset(new TupleDelimitedTextParser(
+ delimited_text_parser_.reset(new DelimitedTextParser(
scan_node_->hdfs_table()->num_cols(), scan_node_->num_partition_keys(),
scan_node_->is_materialized_col(), hdfs_partition->line_delim(),
field_delim, collection_delim, hdfs_partition->escape_char()));
http://git-wip-us.apache.org/repos/asf/impala/blob/b78daedf/be/src/exec/hdfs-text-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index 25886ba..610c612 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -25,7 +25,6 @@
namespace impala {
-template<bool>
class DelimitedTextParser;
class ScannerContext;
struct HdfsFileDesc;
@@ -238,7 +237,7 @@ class HdfsTextScanner : public HdfsScanner {
int slot_idx_;
/// Helper class for picking fields and rows from delimited text.
- boost::scoped_ptr<DelimitedTextParser<true>> delimited_text_parser_;
+ boost::scoped_ptr<DelimitedTextParser> delimited_text_parser_;
/// Return field locations from the Delimited Text Parser.
std::vector<FieldLocation> field_locations_;
[4/5] impala git commit: IMPALA-5384, part 1: introduce DmlExecState
Posted by kw...@apache.org.
IMPALA-5384, part 1: introduce DmlExecState
This change is based on a patch by Marcel Kornacker.
Move data structures that collect DML operation stats from the
RuntimeState and Coordinator into a new InsertExecState class, which
has it's own lock. This removes a dependency on the coordinator's
lock, which will allow further coordinator locking cleanup in the next
patch.
Change-Id: Id4c025917620a7bff2acbeb46464f107ab4b7565
Reviewed-on: http://gerrit.cloudera.org:8080/9793
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/408ee4d6
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/408ee4d6
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/408ee4d6
Branch: refs/heads/master
Commit: 408ee4d68cbb7217eba4b020064a1e878e412ce8
Parents: b78daed
Author: Dan Hecht <dh...@cloudera.com>
Authored: Fri Mar 23 16:28:27 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Mar 29 06:29:16 2018 +0000
----------------------------------------------------------------------
be/src/benchmarks/expr-benchmark.cc | 1 +
be/src/exec/catalog-op-executor.cc | 1 +
be/src/exec/data-sink.cc | 63 ---
be/src/exec/data-sink.h | 9 -
be/src/exec/hbase-table-sink.cc | 11 +-
be/src/exec/hdfs-table-sink.cc | 38 +-
be/src/exec/kudu-table-sink.cc | 22 +-
be/src/exec/plan-root-sink.cc | 1 +
be/src/runtime/CMakeLists.txt | 1 +
be/src/runtime/coordinator-backend-state.cc | 11 +-
be/src/runtime/coordinator-backend-state.h | 18 +-
be/src/runtime/coordinator.cc | 361 ++---------------
be/src/runtime/coordinator.h | 84 +---
be/src/runtime/dml-exec-state.cc | 494 +++++++++++++++++++++++
be/src/runtime/dml-exec-state.h | 149 +++++++
be/src/runtime/query-state.cc | 13 +-
be/src/runtime/runtime-filter-bank.cc | 1 +
be/src/runtime/runtime-state.h | 29 +-
be/src/service/client-request-state.cc | 10 +-
be/src/service/client-request-state.h | 1 +
be/src/service/impala-beeswax-server.cc | 18 +-
be/src/service/impala-hs2-server.cc | 1 +
be/src/service/impala-http-handler.cc | 2 +
be/src/service/impala-server.cc | 2 +
be/src/service/impala-server.h | 3 -
be/src/testutil/in-process-servers.cc | 1 +
26 files changed, 750 insertions(+), 595 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/benchmarks/expr-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/expr-benchmark.cc b/be/src/benchmarks/expr-benchmark.cc
index b10a70f..1b995a5 100644
--- a/be/src/benchmarks/expr-benchmark.cc
+++ b/be/src/benchmarks/expr-benchmark.cc
@@ -53,6 +53,7 @@
#include "runtime/mem-pool.h"
#include "runtime/mem-tracker.h"
#include "service/fe-support.h"
+#include "service/frontend.h"
#include "service/impala-server.h"
#include "common/names.h"
http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/exec/catalog-op-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/catalog-op-executor.cc b/be/src/exec/catalog-op-executor.cc
index 12398cf..164187c 100644
--- a/be/src/exec/catalog-op-executor.cc
+++ b/be/src/exec/catalog-op-executor.cc
@@ -25,6 +25,7 @@
#include "runtime/lib-cache.h"
#include "runtime/client-cache-types.h"
#include "runtime/exec-env.h"
+#include "service/frontend.h"
#include "service/impala-server.h"
#include "service/hs2-util.h"
#include "util/string-parser.h"
http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/exec/data-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index f8f068e..9140b3e 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -121,69 +121,6 @@ Status DataSink::Init(const vector<TExpr>& thrift_output_exprs,
return ScalarExpr::Create(thrift_output_exprs, *row_desc_, state, &output_exprs_);
}
-void DataSink::MergeDmlStats(const TInsertStats& src_stats,
- TInsertStats* dst_stats) {
- dst_stats->bytes_written += src_stats.bytes_written;
- if (src_stats.__isset.kudu_stats) {
- if (!dst_stats->__isset.kudu_stats) dst_stats->__set_kudu_stats(TKuduDmlStats());
- if (!dst_stats->kudu_stats.__isset.num_row_errors) {
- dst_stats->kudu_stats.__set_num_row_errors(0);
- }
- dst_stats->kudu_stats.__set_num_row_errors(
- dst_stats->kudu_stats.num_row_errors + src_stats.kudu_stats.num_row_errors);
- }
- if (src_stats.__isset.parquet_stats) {
- if (dst_stats->__isset.parquet_stats) {
- MergeMapValues<string, int64_t>(src_stats.parquet_stats.per_column_size,
- &dst_stats->parquet_stats.per_column_size);
- } else {
- dst_stats->__set_parquet_stats(src_stats.parquet_stats);
- }
- }
-}
-
-string DataSink::OutputDmlStats(const PartitionStatusMap& stats,
- const string& prefix) {
- const char* indent = " ";
- stringstream ss;
- ss << prefix;
- bool first = true;
- for (const PartitionStatusMap::value_type& val: stats) {
- if (!first) ss << endl;
- first = false;
- ss << "Partition: ";
-
- const string& partition_key = val.first;
- if (partition_key == g_ImpalaInternalService_constants.ROOT_PARTITION_KEY) {
- ss << "Default" << endl;
- } else {
- ss << partition_key << endl;
- }
- if (val.second.__isset.num_modified_rows) {
- ss << "NumModifiedRows: " << val.second.num_modified_rows << endl;
- }
-
- if (!val.second.__isset.stats) continue;
- const TInsertStats& stats = val.second.stats;
- if (stats.__isset.kudu_stats) {
- ss << "NumRowErrors: " << stats.kudu_stats.num_row_errors << endl;
- }
-
- ss << indent << "BytesWritten: "
- << PrettyPrinter::Print(stats.bytes_written, TUnit::BYTES);
- if (stats.__isset.parquet_stats) {
- const TParquetInsertStats& parquet_stats = stats.parquet_stats;
- ss << endl << indent << "Per Column Sizes:";
- for (map<string, int64_t>::const_iterator i = parquet_stats.per_column_size.begin();
- i != parquet_stats.per_column_size.end(); ++i) {
- ss << endl << indent << indent << i->first << ": "
- << PrettyPrinter::Print(i->second, TUnit::BYTES);
- }
- }
- }
- return ss.str();
-}
-
Status DataSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
DCHECK(parent_mem_tracker != nullptr);
DCHECK(profile_ != nullptr);
http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/exec/data-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.h b/be/src/exec/data-sink.h
index d2e80af..605b46d 100644
--- a/be/src/exec/data-sink.h
+++ b/be/src/exec/data-sink.h
@@ -85,15 +85,6 @@ class DataSink {
const TPlanFragmentInstanceCtx& fragment_instance_ctx,
const RowDescriptor* row_desc, RuntimeState* state, DataSink** sink);
- /// Merges one update to the DML stats for a partition. dst_stats will have the
- /// combined stats of src_stats and dst_stats after this method returns.
- static void MergeDmlStats(const TInsertStats& src_stats,
- TInsertStats* dst_stats);
-
- /// Outputs the DML stats contained in the map of partition updates to a string
- static std::string OutputDmlStats(const PartitionStatusMap& stats,
- const std::string& prefix = "");
-
MemTracker* mem_tracker() const { return mem_tracker_.get(); }
RuntimeProfile* profile() const { return profile_; }
const std::vector<ScalarExprEvaluator*>& output_expr_evals() const {
http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/exec/hbase-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-table-sink.cc b/be/src/exec/hbase-table-sink.cc
index 9e591ac..567e8bd 100644
--- a/be/src/exec/hbase-table-sink.cc
+++ b/be/src/exec/hbase-table-sink.cc
@@ -59,12 +59,7 @@ Status HBaseTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_track
RETURN_IF_ERROR(hbase_table_writer_->Init(state));
// Add a 'root partition' status in which to collect insert statistics
- TInsertPartitionStatus root_status;
- root_status.__set_num_modified_rows(0L);
- root_status.__set_stats(TInsertStats());
- root_status.__set_id(-1L);
- state->per_partition_status()->insert(make_pair(ROOT_PARTITION_KEY, root_status));
-
+ state->dml_exec_state()->AddPartition(ROOT_PARTITION_KEY, -1L, nullptr);
return Status::OK();
}
@@ -74,8 +69,8 @@ Status HBaseTableSink::Send(RuntimeState* state, RowBatch* batch) {
RETURN_IF_ERROR(state->CheckQueryState());
// Since everything is set up just forward everything to the writer.
RETURN_IF_ERROR(hbase_table_writer_->AppendRows(batch));
- (*state->per_partition_status())[ROOT_PARTITION_KEY].num_modified_rows +=
- batch->num_rows();
+ state->dml_exec_state()->UpdatePartition(
+ ROOT_PARTITION_KEY, batch->num_rows(), nullptr);
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/exec/hdfs-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index c12b711..ca08bac 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -403,8 +403,9 @@ Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state,
COUNTER_ADD(files_created_counter_, 1);
if (!ShouldSkipStaging(state, output_partition)) {
- // Save the ultimate destination for this file (it will be moved by the coordinator)
- (*state->hdfs_files_to_move())[output_partition->current_file_name] = final_location;
+ // Save the ultimate destination for this file (it will be moved by the coordinator).
+ state->dml_exec_state()->AddFileToMove(
+ output_partition->current_file_name, final_location);
}
++output_partition->num_files;
@@ -573,21 +574,15 @@ inline Status HdfsTableSink::GetOutputPartition(RuntimeState* state, const Tuple
return status;
}
- // Save the partition name so that the coordinator can create the partition directory
- // structure if needed
- DCHECK(state->per_partition_status()->find(partition->partition_name) ==
- state->per_partition_status()->end());
- TInsertPartitionStatus partition_status;
- partition_status.__set_num_modified_rows(0L);
- partition_status.__set_id(partition_descriptor->id());
- partition_status.__set_stats(TInsertStats());
- partition_status.__set_partition_base_dir(table_desc_->hdfs_base_dir());
- state->per_partition_status()->insert(
- make_pair(partition->partition_name, partition_status));
+ // Save the partition name so that the coordinator can create the partition
+ // directory structure if needed.
+ state->dml_exec_state()->AddPartition(
+ partition->partition_name, partition_descriptor->id(),
+ &table_desc_->hdfs_base_dir());
if (!no_more_rows && !ShouldSkipStaging(state, partition.get())) {
- // Indicate that temporary directory is to be deleted after execution
- (*state->hdfs_files_to_move())[partition->tmp_hdfs_dir_name] = "";
+ // Indicate that temporary directory is to be deleted after execution.
+ state->dml_exec_state()->AddFileToMove(partition->tmp_hdfs_dir_name, "");
}
partition_keys_to_output_partitions_[key].first = std::move(partition);
@@ -643,17 +638,8 @@ Status HdfsTableSink::FinalizePartitionFile(RuntimeState* state,
// OutputPartition writer could be nullptr if there is no row to output.
if (partition->writer.get() != nullptr) {
RETURN_IF_ERROR(partition->writer->Finalize());
-
- // Track total number of appended rows per partition in runtime
- // state. partition->num_rows counts number of rows appended is per-file.
- PartitionStatusMap::iterator it =
- state->per_partition_status()->find(partition->partition_name);
-
- // Should have been created in GetOutputPartition() when the partition was
- // initialised.
- DCHECK(it != state->per_partition_status()->end());
- it->second.num_modified_rows += partition->num_rows;
- DataSink::MergeDmlStats(partition->writer->stats(), &it->second.stats);
+ state->dml_exec_state()->UpdatePartition(
+ partition->partition_name, partition->num_rows, &partition->writer->stats());
}
RETURN_IF_ERROR(ClosePartitionFile(state, partition));
http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/exec/kudu-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index 05f1d06..67bb86e 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -61,9 +61,6 @@ using kudu::client::KuduError;
namespace impala {
-const static string& ROOT_PARTITION_KEY =
- g_ImpalaInternalService_constants.ROOT_PARTITION_KEY;
-
// Send 7MB buffers to Kudu, matching a hard-coded size in Kudu (KUDU-1693).
const static int INDIVIDUAL_BUFFER_SIZE = 7 * 1024 * 1024;
@@ -92,15 +89,7 @@ Status KuduTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracke
<< "TableDescriptor must be an instance KuduTableDescriptor.";
table_desc_ = static_cast<const KuduTableDescriptor*>(table_desc);
- // Add a 'root partition' status in which to collect write statistics
- TInsertPartitionStatus root_status;
- root_status.__set_num_modified_rows(0L);
- root_status.__set_id(-1L);
- TKuduDmlStats kudu_dml_stats;
- kudu_dml_stats.__set_num_row_errors(0L);
- root_status.__set_stats(TInsertStats());
- root_status.stats.__set_kudu_stats(kudu_dml_stats);
- state->per_partition_status()->insert(make_pair(ROOT_PARTITION_KEY, root_status));
+ state->dml_exec_state()->InitForKuduDml();
// Add counters
total_rows_ = ADD_COUNTER(profile(), "TotalNumRows", TUnit::UNIT);
@@ -338,12 +327,9 @@ Status KuduTableSink::FlushFinal(RuntimeState* state) {
VLOG_RPC << "Ignoring Flush() error status: " << flush_status.ToString();
}
Status status = CheckForErrors(state);
- TInsertPartitionStatus& insert_status =
- (*state->per_partition_status())[ROOT_PARTITION_KEY];
- insert_status.__set_num_modified_rows(
- total_rows_->value() - num_row_errors_->value());
- insert_status.stats.kudu_stats.__set_num_row_errors(num_row_errors_->value());
- insert_status.__set_kudu_latest_observed_ts(client_->GetLatestObservedTimestamp());
+ state->dml_exec_state()->SetKuduDmlStats(
+ total_rows_->value() - num_row_errors_->value(), num_row_errors_->value(),
+ client_->GetLatestObservedTimestamp());
return status;
}
http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/exec/plan-root-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc
index 1cdc544..836a376 100644
--- a/be/src/exec/plan-root-sink.cc
+++ b/be/src/exec/plan-root-sink.cc
@@ -117,6 +117,7 @@ void PlanRootSink::Close(RuntimeState* state) {
unique_lock<mutex> l(lock_);
// No guarantee that FlushFinal() has been called, so need to mark sender_done_ here as
// well.
+ // TODO: shouldn't this also set eos to true? do we need both eos and sender_done_?
sender_done_ = true;
consumer_cv_.NotifyAll();
// Wait for consumer to be done, in case sender tries to tear-down this sink while the
http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 0d4b61c..89cbbb9 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -37,6 +37,7 @@ add_library(Runtime
data-stream-sender.cc
debug-options.cc
descriptors.cc
+ dml-exec-state.cc
exec-env.cc
fragment-instance-state.cc
hbase-table.cc
http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 914a3e4..e8db00e 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -43,10 +43,7 @@ Coordinator::BackendState::BackendState(
const TUniqueId& query_id, int state_idx, TRuntimeFilterMode::type filter_mode)
: query_id_(query_id),
state_idx_(state_idx),
- filter_mode_(filter_mode),
- rpc_latency_(0),
- rpc_sent_(false),
- peak_consumption_(0L) {
+ filter_mode_(filter_mode) {
}
void Coordinator::BackendState::Init(
@@ -413,11 +410,7 @@ Coordinator::BackendState::InstanceStats::InstanceStats(
const FInstanceExecParams& exec_params, FragmentStats* fragment_stats,
ObjectPool* obj_pool)
: exec_params_(exec_params),
- profile_(nullptr),
- done_(false),
- profile_created_(false),
- total_split_size_(0),
- total_ranges_complete_(0) {
+ profile_(nullptr) {
const string& profile_name = Substitute("Instance $0 (host=$1)",
PrintId(exec_params.instance_id), lexical_cast<string>(exec_params.host));
profile_ = RuntimeProfile::Create(obj_pool, profile_name);
http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/coordinator-backend-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index 0973ca3..d2f122c 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -161,24 +161,24 @@ class Coordinator::BackendState {
int64_t last_report_time_ms_ = 0;
/// owned by coordinator object pool provided in the c'tor, created in Update()
- RuntimeProfile* profile_;
+ RuntimeProfile* profile_ = nullptr;
/// true if the final report has been received for the fragment instance.
/// Used to handle duplicate done ReportExecStatus RPC messages. Used only
/// in ApplyExecStatusReport()
- bool done_;
+ bool done_ = false;
/// true after the first call to profile->Update()
- bool profile_created_;
+ bool profile_created_ = false;
/// cumulative size of all splits; set in c'tor
- int64_t total_split_size_;
+ int64_t total_split_size_ = 0;
/// wall clock timer for this instance
MonotonicStopWatch stopwatch_;
/// total scan ranges complete across all scan nodes
- int64_t total_ranges_complete_;
+ int64_t total_ranges_complete_ = 0;
/// SCAN_RANGES_COMPLETE_COUNTERs in profile_
std::vector<RuntimeProfile::Counter*> scan_ranges_complete_counters_;
@@ -215,7 +215,7 @@ class Coordinator::BackendState {
boost::mutex lock_;
// number of in-flight instances
- int num_remaining_instances_;
+ int num_remaining_instances_ = 0;
/// If the status indicates an error status, execution has either been aborted by the
/// executing impalad (which then reported the error) or cancellation has been
@@ -235,15 +235,15 @@ class Coordinator::BackendState {
ErrorLogMap error_log_;
/// Time, in ms, that it took to execute the ExecRemoteFragment() RPC.
- int64_t rpc_latency_;
+ int64_t rpc_latency_ = 0;
/// If true, ExecPlanFragment() rpc has been sent - even if it was not determined to be
/// successful.
- bool rpc_sent_;
+ bool rpc_sent_ = false;
/// peak memory used for this query (value of that node's query memtracker's
/// peak_consumption()
- int64_t peak_consumption_;
+ int64_t peak_consumption_ = 0;
/// Set in ApplyExecStatusReport(). Uses MonotonicMillis().
int64_t last_report_time_ms_ = 0;
http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index a5d53b2..e6b3bca 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -63,10 +63,7 @@ using std::unique_ptr;
DECLARE_int32(be_port);
DECLARE_string(hostname);
-DEFINE_bool(insert_inherit_permissions, false, "If true, new directories created by "
- "INSERTs will inherit the permissions of their parent directories");
-
-namespace impala {
+using namespace impala;
// Maximum number of fragment instances that can publish each broadcast filter.
static const int MAX_BROADCAST_FILTER_PRODUCERS = 3;
@@ -93,9 +90,6 @@ Status Coordinator::Exec() {
const TQueryExecRequest& request = schedule_.request();
DCHECK(request.plan_exec_info.size() > 0);
- needs_finalization_ = request.__isset.finalize_params;
- if (needs_finalization_) finalize_params_ = request.finalize_params;
-
VLOG_QUERY << "Exec() query_id=" << schedule_.query_id()
<< " stmt=" << request.query_ctx.client_request.stmt;
stmt_type_ = request.stmt_type;
@@ -489,291 +483,32 @@ Status Coordinator::UpdateStatus(const Status& status, const string& backend_hos
return query_status_;
}
-void Coordinator::PopulatePathPermissionCache(hdfsFS fs, const string& path_str,
- PermissionCache* permissions_cache) {
- // Find out if the path begins with a hdfs:// -style prefix, and remove it and the
- // location (e.g. host:port) if so.
- int scheme_end = path_str.find("://");
- string stripped_str;
- if (scheme_end != string::npos) {
- // Skip past the subsequent location:port/ prefix.
- stripped_str = path_str.substr(path_str.find("/", scheme_end + 3));
- } else {
- stripped_str = path_str;
- }
-
- // Get the list of path components, used to build all path prefixes.
- vector<string> components;
- split(components, stripped_str, is_any_of("/"));
-
- // Build a set of all prefixes (including the complete string) of stripped_path. So
- // /a/b/c/d leads to a vector of: /a, /a/b, /a/b/c, /a/b/c/d
- vector<string> prefixes;
- // Stores the current prefix
- stringstream accumulator;
- for (const string& component: components) {
- if (component.empty()) continue;
- accumulator << "/" << component;
- prefixes.push_back(accumulator.str());
- }
-
- // Now for each prefix, stat() it to see if a) it exists and b) if so what its
- // permissions are. When we meet a directory that doesn't exist, we record the fact that
- // we need to create it, and the permissions of its parent dir to inherit.
- //
- // Every prefix is recorded in the PermissionCache so we don't do more than one stat()
- // for each path. If we need to create the directory, we record it as the pair (true,
- // perms) so that the caller can identify which directories need their permissions
- // explicitly set.
-
- // Set to the permission of the immediate parent (i.e. the permissions to inherit if the
- // current dir doesn't exist).
- short permissions = 0;
- for (const string& path: prefixes) {
- PermissionCache::const_iterator it = permissions_cache->find(path);
- if (it == permissions_cache->end()) {
- hdfsFileInfo* info = hdfsGetPathInfo(fs, path.c_str());
- if (info != nullptr) {
- // File exists, so fill the cache with its current permissions.
- permissions_cache->insert(
- make_pair(path, make_pair(false, info->mPermissions)));
- permissions = info->mPermissions;
- hdfsFreeFileInfo(info, 1);
- } else {
- // File doesn't exist, so we need to set its permissions to its immediate parent
- // once it's been created.
- permissions_cache->insert(make_pair(path, make_pair(true, permissions)));
- }
- } else {
- permissions = it->second.second;
- }
- }
-}
-
-Status Coordinator::FinalizeSuccessfulInsert() {
- PermissionCache permissions_cache;
- HdfsFsCache::HdfsFsMap filesystem_connection_cache;
- HdfsOperationSet partition_create_ops(&filesystem_connection_cache);
-
- // INSERT finalization happens in the five following steps
- // 1. If OVERWRITE, remove all the files in the target directory
- // 2. Create all the necessary partition directories.
- HdfsTableDescriptor* hdfs_table;
- RETURN_IF_ERROR(DescriptorTbl::CreateHdfsTblDescriptor(query_ctx_.desc_tbl,
- finalize_params_.table_id, obj_pool(), &hdfs_table));
- DCHECK(hdfs_table != nullptr)
- << "INSERT target table not known in descriptor table: "
- << finalize_params_.table_id;
-
- // Loop over all partitions that were updated by this insert, and create the set of
- // filesystem operations required to create the correct partition structure on disk.
- for (const PartitionStatusMap::value_type& partition: per_partition_status_) {
- SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "Overwrite/PartitionCreationTimer",
- "FinalizationTimer"));
- // INSERT allows writes to tables that have partitions on multiple filesystems.
- // So we need to open connections to different filesystems as necessary. We use a
- // local connection cache and populate it with one connection per filesystem that the
- // partitions are on.
- hdfsFS partition_fs_connection;
- RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
- partition.second.partition_base_dir, &partition_fs_connection,
- &filesystem_connection_cache));
-
- // Look up the partition in the descriptor table.
- stringstream part_path_ss;
- if (partition.second.id == -1) {
- // If this is a non-existant partition, use the default partition location of
- // <base_dir>/part_key_1=val/part_key_2=val/...
- part_path_ss << finalize_params_.hdfs_base_dir << "/" << partition.first;
- } else {
- HdfsPartitionDescriptor* part = hdfs_table->GetPartition(partition.second.id);
- DCHECK(part != nullptr)
- << "table_id=" << hdfs_table->id() << " partition_id=" << partition.second.id
- << "\n" << PrintThrift(runtime_state()->instance_ctx());
- part_path_ss << part->location();
- }
- const string& part_path = part_path_ss.str();
- bool is_s3_path = IsS3APath(part_path.c_str());
-
- // If this is an overwrite insert, we will need to delete any updated partitions
- if (finalize_params_.is_overwrite) {
- if (partition.first.empty()) {
- // If the root directory is written to, then the table must not be partitioned
- DCHECK(per_partition_status_.size() == 1);
- // We need to be a little more careful, and only delete data files in the root
- // because the tmp directories the sink(s) wrote are there also.
- // So only delete files in the table directory - all files are treated as data
- // files by Hive and Impala, but directories are ignored (and may legitimately
- // be used to store permanent non-table data by other applications).
- int num_files = 0;
- // hfdsListDirectory() only sets errno if there is an error, but it doesn't set
- // it to 0 if the call succeed. When there is no error, errno could be any
- // value. So need to clear errno before calling it.
- // Once HDFS-8407 is fixed, the errno reset won't be needed.
- errno = 0;
- hdfsFileInfo* existing_files =
- hdfsListDirectory(partition_fs_connection, part_path.c_str(), &num_files);
- if (existing_files == nullptr && errno == EAGAIN) {
- errno = 0;
- existing_files =
- hdfsListDirectory(partition_fs_connection, part_path.c_str(), &num_files);
- }
- // hdfsListDirectory() returns nullptr not only when there is an error but also
- // when the directory is empty(HDFS-8407). Need to check errno to make sure
- // the call fails.
- if (existing_files == nullptr && errno != 0) {
- return Status(GetHdfsErrorMsg("Could not list directory: ", part_path));
- }
- for (int i = 0; i < num_files; ++i) {
- const string filename = path(existing_files[i].mName).filename().string();
- if (existing_files[i].mKind == kObjectKindFile && !IsHiddenFile(filename)) {
- partition_create_ops.Add(DELETE, existing_files[i].mName);
- }
- }
- hdfsFreeFileInfo(existing_files, num_files);
- } else {
- // This is a partition directory, not the root directory; we can delete
- // recursively with abandon, after checking that it ever existed.
- // TODO: There's a potential race here between checking for the directory
- // and a third-party deleting it.
- if (FLAGS_insert_inherit_permissions && !is_s3_path) {
- // There is no directory structure in S3, so "inheriting" permissions is not
- // possible.
- // TODO: Try to mimic inheriting permissions for S3.
- PopulatePathPermissionCache(
- partition_fs_connection, part_path, &permissions_cache);
- }
- // S3 doesn't have a directory structure, so we technically wouldn't need to
- // CREATE_DIR on S3. However, libhdfs always checks if a path exists before
- // carrying out an operation on that path. So we still need to call CREATE_DIR
- // before we access that path due to this limitation.
- if (hdfsExists(partition_fs_connection, part_path.c_str()) != -1) {
- partition_create_ops.Add(DELETE_THEN_CREATE, part_path);
- } else {
- // Otherwise just create the directory.
- partition_create_ops.Add(CREATE_DIR, part_path);
- }
- }
- } else if (!is_s3_path
- || !query_ctx_.client_request.query_options.s3_skip_insert_staging) {
- // If the S3_SKIP_INSERT_STAGING query option is set, then the partition directories
- // would have already been created by the table sinks.
- if (FLAGS_insert_inherit_permissions && !is_s3_path) {
- PopulatePathPermissionCache(
- partition_fs_connection, part_path, &permissions_cache);
- }
- if (hdfsExists(partition_fs_connection, part_path.c_str()) == -1) {
- partition_create_ops.Add(CREATE_DIR, part_path);
- }
- }
- }
-
- // We're done with the HDFS descriptor - free up its resources.
- hdfs_table->ReleaseResources();
- hdfs_table = nullptr;
-
- {
- SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "Overwrite/PartitionCreationTimer",
- "FinalizationTimer"));
- if (!partition_create_ops.Execute(
- ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
- for (const HdfsOperationSet::Error& err: partition_create_ops.errors()) {
- // It's ok to ignore errors creating the directories, since they may already
- // exist. If there are permission errors, we'll run into them later.
- if (err.first->op() != CREATE_DIR) {
- return Status(Substitute(
- "Error(s) deleting partition directories. First error (of $0) was: $1",
- partition_create_ops.errors().size(), err.second));
- }
- }
- }
- }
-
- // 3. Move all tmp files
- HdfsOperationSet move_ops(&filesystem_connection_cache);
- HdfsOperationSet dir_deletion_ops(&filesystem_connection_cache);
-
- for (FileMoveMap::value_type& move: files_to_move_) {
- // Empty destination means delete, so this is a directory. These get deleted in a
- // separate pass to ensure that we have moved all the contents of the directory first.
- if (move.second.empty()) {
- VLOG_ROW << "Deleting file: " << move.first;
- dir_deletion_ops.Add(DELETE, move.first);
- } else {
- VLOG_ROW << "Moving tmp file: " << move.first << " to " << move.second;
- if (FilesystemsMatch(move.first.c_str(), move.second.c_str())) {
- move_ops.Add(RENAME, move.first, move.second);
- } else {
- move_ops.Add(MOVE, move.first, move.second);
- }
- }
- }
-
- {
- SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "FileMoveTimer", "FinalizationTimer"));
- if (!move_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
- stringstream ss;
- ss << "Error(s) moving partition files. First error (of "
- << move_ops.errors().size() << ") was: " << move_ops.errors()[0].second;
- return Status(ss.str());
- }
- }
-
- // 4. Delete temp directories
- {
- SCOPED_TIMER(ADD_CHILD_TIMER(query_profile_, "FileDeletionTimer",
- "FinalizationTimer"));
- if (!dir_deletion_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
- stringstream ss;
- ss << "Error(s) deleting staging directories. First error (of "
- << dir_deletion_ops.errors().size() << ") was: "
- << dir_deletion_ops.errors()[0].second;
- return Status(ss.str());
- }
- }
-
- // 5. Optionally update the permissions of the created partition directories
- // Do this last so that we don't make a dir unwritable before we write to it.
- if (FLAGS_insert_inherit_permissions) {
- HdfsOperationSet chmod_ops(&filesystem_connection_cache);
- for (const PermissionCache::value_type& perm: permissions_cache) {
- bool new_dir = perm.second.first;
- if (new_dir) {
- short permissions = perm.second.second;
- VLOG_QUERY << "INSERT created new directory: " << perm.first
- << ", inherited permissions are: " << oct << permissions;
- chmod_ops.Add(CHMOD, perm.first, permissions);
- }
- }
- if (!chmod_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
- stringstream ss;
- ss << "Error(s) setting permissions on newly created partition directories. First"
- << " error (of " << chmod_ops.errors().size() << ") was: "
- << chmod_ops.errors()[0].second;
- return Status(ss.str());
- }
- }
-
- return Status::OK();
-}
-
-Status Coordinator::FinalizeQuery() {
+Status Coordinator::FinalizeHdfsInsert() {
// All instances must have reported their final statuses before finalization, which is a
// post-condition of Wait. If the query was not successful, still try to clean up the
// staging directory.
DCHECK(has_called_wait_);
- DCHECK(needs_finalization_);
+ DCHECK(finalize_params() != nullptr);
VLOG_QUERY << "Finalizing query: " << query_id();
SCOPED_TIMER(finalization_timer_);
Status return_status = GetStatus();
if (return_status.ok()) {
- return_status = FinalizeSuccessfulInsert();
+ HdfsTableDescriptor* hdfs_table;
+ RETURN_IF_ERROR(DescriptorTbl::CreateHdfsTblDescriptor(query_ctx().desc_tbl,
+ finalize_params()->table_id, obj_pool(), &hdfs_table));
+ DCHECK(hdfs_table != nullptr)
+ << "INSERT target table not known in descriptor table: "
+ << finalize_params()->table_id;
+ return_status = dml_exec_state_.FinalizeHdfsInsert(*finalize_params(),
+ query_ctx().client_request.query_options.s3_skip_insert_staging,
+ hdfs_table, query_profile_);
+ hdfs_table->ReleaseResources();
}
stringstream staging_dir;
- DCHECK(finalize_params_.__isset.staging_dir);
- staging_dir << finalize_params_.staging_dir << "/" << PrintId(query_id(),"_") << "/";
+ DCHECK(finalize_params()->__isset.staging_dir);
+ staging_dir << finalize_params()->staging_dir << "/" << PrintId(query_id(),"_") << "/";
hdfsFS hdfs_conn;
RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(staging_dir.str(), &hdfs_conn));
@@ -813,26 +548,26 @@ Status Coordinator::Wait() {
}
DCHECK_EQ(stmt_type_, TStmtType::DML);
- // Query finalization can only happen when all backends have reported
- // relevant state. They only have relevant state to report in the parallel
- // INSERT case, otherwise all the relevant state is from the coordinator
- // fragment which will be available after Open() returns.
- // Ignore the returned status if finalization is required., since FinalizeQuery() will
- // pick it up and needs to execute regardless.
+ // Query finalization can only happen when all backends have reported relevant
+ // state. They only have relevant state to report in the parallel INSERT case,
+ // otherwise all the relevant state is from the coordinator fragment which will be
+ // available after Open() returns. Ignore the returned status if finalization is
+ // required., since FinalizeHdfsInsert() will pick it up and needs to execute
+ // regardless.
Status status = WaitForBackendCompletion();
- if (!needs_finalization_ && !status.ok()) return status;
+ if (finalize_params() == nullptr && !status.ok()) return status;
// Execution of query fragments has finished. We don't need to hold onto query execution
// resources while we finalize the query.
ReleaseExecResources();
// Query finalization is required only for HDFS table sinks
- if (needs_finalization_) RETURN_IF_ERROR(FinalizeQuery());
+ if (finalize_params() != nullptr) RETURN_IF_ERROR(FinalizeHdfsInsert());
// Release admission control resources after we'd done the potentially heavyweight
// finalization.
ReleaseAdmissionControlResources();
query_profile_->AddInfoString(
- "DML Stats", DataSink::OutputDmlStats(per_partition_status_, "\n"));
+ "DML Stats", dml_exec_state_.OutputPartitionStats("\n"));
// For DML queries, when Wait is done, the query is complete.
ComputeQuerySummary();
return status;
@@ -929,7 +664,7 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& param
// TODO: only do this when the sink is done; probably missing a done field
// in TReportExecStatus for that
if (params.__isset.insert_exec_status) {
- UpdateInsertExecStatus(params.insert_exec_status);
+ dml_exec_state_.Update(params.insert_exec_status);
}
if (backend_state->ApplyExecStatusReport(params, &exec_summary_, &progress_)) {
@@ -971,52 +706,10 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& param
return Status::OK();
}
-void Coordinator::UpdateInsertExecStatus(const TInsertExecStatus& insert_exec_status) {
- lock_guard<mutex> l(lock_);
- for (const PartitionStatusMap::value_type& partition:
- insert_exec_status.per_partition_status) {
- TInsertPartitionStatus* status = &(per_partition_status_[partition.first]);
- status->__set_num_modified_rows(
- status->num_modified_rows + partition.second.num_modified_rows);
- status->__set_kudu_latest_observed_ts(std::max(
- partition.second.kudu_latest_observed_ts, status->kudu_latest_observed_ts));
- status->__set_id(partition.second.id);
- status->__set_partition_base_dir(partition.second.partition_base_dir);
-
- if (partition.second.__isset.stats) {
- if (!status->__isset.stats) status->__set_stats(TInsertStats());
- DataSink::MergeDmlStats(partition.second.stats, &status->stats);
- }
- }
- files_to_move_.insert(
- insert_exec_status.files_to_move.begin(), insert_exec_status.files_to_move.end());
-}
-
-
-uint64_t Coordinator::GetLatestKuduInsertTimestamp() const {
- uint64_t max_ts = 0;
- for (const auto& entry : per_partition_status_) {
- max_ts = std::max(max_ts,
- static_cast<uint64_t>(entry.second.kudu_latest_observed_ts));
- }
- return max_ts;
-}
-
RuntimeState* Coordinator::runtime_state() {
return coord_instance_ == nullptr ? nullptr : coord_instance_->runtime_state();
}
-bool Coordinator::PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update) {
- // Assume we are called only after all fragments have completed
- DCHECK(has_called_wait_);
-
- for (const PartitionStatusMap::value_type& partition: per_partition_status_) {
- catalog_update->created_partitions.insert(partition.first);
- }
-
- return catalog_update->created_partitions.size() != 0;
-}
-
// TODO: add histogram/percentile
void Coordinator::ComputeQuerySummary() {
// In this case, the query did not even get to start all fragment instances.
@@ -1285,4 +978,8 @@ void Coordinator::FInstanceStatsToJson(Document* doc) {
}
doc->AddMember("backend_instances", states, doc->GetAllocator());
}
+
+const TFinalizeParams* Coordinator::finalize_params() const {
+ return schedule_.request().__isset.finalize_params
+ ? &schedule_.request().finalize_params : nullptr;
}
http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index d630b9a..6665c08 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -38,7 +38,8 @@
#include "common/status.h"
#include "gen-cpp/Frontend_types.h"
#include "gen-cpp/Types_types.h"
-#include "runtime/runtime-state.h" // for PartitionStatusMap; TODO: disentangle
+#include "runtime/dml-exec-state.h"
+#include "runtime/query-state.h"
#include "scheduling/query-schedule.h"
#include "util/condition-variable.h"
#include "util/progress-updater.h"
@@ -130,9 +131,9 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
/// Idempotent.
void Cancel(const Status* cause = nullptr);
- /// Updates execution status of a particular backend as well as Insert-related
- /// status (per_partition_status_ and files_to_move_). Also updates
- /// num_remaining_backends_ and cancels execution if the backend has an error status.
+ /// Updates execution status of a particular backend as well as dml_exec_state_.
+ /// Also updates num_remaining_backends_ and cancels execution if the backend has an
+ /// error status.
Status UpdateBackendExecStatus(const TReportExecStatusParams& params)
WARN_UNUSED_RESULT;
@@ -149,17 +150,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
MemTracker* query_mem_tracker() const;
/// This is safe to call only after Wait()
- const PartitionStatusMap& per_partition_status() { return per_partition_status_; }
-
- /// Returns the latest Kudu timestamp observed across any backends where DML into Kudu
- /// was executed, or 0 if there were no Kudu timestamps reported.
- /// This should only be called after Wait().
- uint64_t GetLatestKuduInsertTimestamp() const;
-
- /// Gathers all updates to the catalog required once this query has completed execution.
- /// Returns true if a catalog update is required, false otherwise.
- /// Must only be called after Wait()
- bool PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update);
+ DmlExecState* dml_exec_state() { return &dml_exec_state_; }
/// Return error log for coord and all the fragments. The error messages from the
/// individual fragment instances are merged into a single output to retain readability.
@@ -229,12 +220,6 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
/// GetNext() hits eos.
PlanRootSink* coord_sink_ = nullptr;
- /// True if the query needs a post-execution step to tidy up
- bool needs_finalization_ = false;
-
- /// Only valid if needs_finalization is true
- TFinalizeParams finalize_params_;
-
/// ensures single-threaded execution of Wait(); must not hold lock_ when acquiring this
boost::mutex wait_lock_;
@@ -275,6 +260,11 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
ExecSummary exec_summary_;
+ /// Filled in as the query completes and tracks the results of DML queries. This is
+ /// either the union of the reports from all fragment instances, or taken from the
+ /// coordinator fragment: only one of the two can legitimately produce updates.
+ DmlExecState dml_exec_state_;
+
/// Aggregate counters for the entire query. Lives in 'obj_pool_'.
RuntimeProfile* query_profile_ = nullptr;
@@ -308,21 +298,6 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
/// hits 0, any Wait()'ing thread is notified
int num_remaining_backends_ = 0;
- /// The following two structures, partition_row_counts_ and files_to_move_ are filled in
- /// as the query completes, and track the results of INSERT queries that alter the
- /// structure of tables. They are either the union of the reports from all fragment
- /// instances, or taken from the coordinator fragment: only one of the two can
- /// legitimately produce updates.
-
- /// The set of partitions that have been written to or updated by all fragment
- /// instances, along with statistics such as the number of rows written (may be 0). For
- /// unpartitioned tables, the empty string denotes the entire table.
- PartitionStatusMap per_partition_status_;
-
- /// The set of files to move after an INSERT query has run, in (src, dest) form. An
- /// empty string for the destination means that a file is to be deleted.
- FileMoveMap files_to_move_;
-
/// Event timeline for this query. Not owned.
RuntimeProfile::EventSequence* query_events_ = nullptr;
@@ -357,6 +332,12 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
/// Returns a local object pool.
ObjectPool* obj_pool() { return obj_pool_.get(); }
+ /// Returns request's finalize params, or nullptr if not present. If not present, then
+ /// HDFS INSERT finalization is not required.
+ const TFinalizeParams* finalize_params() const;
+
+ const TQueryCtx& query_ctx() const { return schedule_.request().query_ctx; }
+
/// Only valid *after* calling Exec(). Return nullptr if the running query does not
/// produce any rows.
RuntimeState* runtime_state();
@@ -381,9 +362,6 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
Status UpdateStatus(const Status& status, const std::string& backend_hostname,
bool is_fragment_failure, const TUniqueId& failed_fragment) WARN_UNUSED_RESULT;
- /// Update per_partition_status_ and files_to_move_.
- void UpdateInsertExecStatus(const TInsertExecStatus& insert_exec_status);
-
/// Returns only when either all execution backends have reported success or the query
/// is in error. Returns the status of the query.
/// It is safe to call this concurrently, but any calls must be made only after Exec().
@@ -403,29 +381,11 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
/// profiles must not be updated while this is running.
void ComputeQuerySummary();
- /// TODO: move the next 3 functions into a separate class
-
- /// Determines what the permissions of directories created by INSERT statements should
- /// be if permission inheritance is enabled. Populates a map from all prefixes of
- /// path_str (including the full path itself) which is a path in Hdfs, to pairs
- /// (does_not_exist, permissions), where does_not_exist is true if the path does not
- /// exist in Hdfs. If does_not_exist is true, permissions is set to the permissions of
- /// the most immediate ancestor of the path that does exist, i.e. the permissions that
- /// the path should inherit when created. Otherwise permissions is set to the actual
- /// permissions of the path. The PermissionCache argument is also used to cache the
- /// output across repeated calls, to avoid repeatedly calling hdfsGetPathInfo() on the
- /// same path.
- typedef boost::unordered_map<std::string, std::pair<bool, short>> PermissionCache;
- void PopulatePathPermissionCache(hdfsFS fs, const std::string& path_str,
- PermissionCache* permissions_cache);
-
- /// Moves all temporary staging files to their final destinations.
- Status FinalizeSuccessfulInsert() WARN_UNUSED_RESULT;
-
- /// Perform any post-query cleanup required. Called by Wait() only after all fragment
- /// instances have returned, or if the query has failed, in which case it only cleans up
- /// temporary data rather than finishing the INSERT in flight.
- Status FinalizeQuery() WARN_UNUSED_RESULT;
+ /// Perform any post-query cleanup required for HDFS (or other Hadoop FileSystem)
+ /// INSERT. Called by Wait() only after all fragment instances have returned, or if
+ /// the query has failed, in which case it only cleans up temporary data rather than
+ /// finishing the INSERT in flight.
+ Status FinalizeHdfsInsert() WARN_UNUSED_RESULT;
/// Populates backend_states_, starts query execution at all backends in parallel, and
/// blocks until startup completes.
http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/dml-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/dml-exec-state.cc b/be/src/runtime/dml-exec-state.cc
new file mode 100644
index 0000000..6853da5
--- /dev/null
+++ b/be/src/runtime/dml-exec-state.cc
@@ -0,0 +1,494 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/dml-exec-state.h"
+
+#include <boost/thread/locks.hpp>
+#include <boost/thread/lock_guard.hpp>
+#include <boost/algorithm/string.hpp>
+#include <boost/algorithm/string/join.hpp>
+#include <boost/filesystem.hpp>
+#include <gutil/strings/substitute.h>
+
+#include "common/logging.h"
+#include "util/pretty-printer.h"
+#include "util/container-util.h"
+#include "util/hdfs-bulk-ops.h"
+#include "util/hdfs-util.h"
+#include "util/runtime-profile-counters.h"
+#include "runtime/descriptors.h"
+#include "runtime/hdfs-fs-cache.h"
+#include "runtime/exec-env.h"
+#include "gen-cpp/ImpalaService_types.h"
+#include "gen-cpp/ImpalaInternalService_constants.h"
+#include "gen-cpp/ImpalaInternalService_types.h"
+#include "gen-cpp/Frontend_types.h"
+
+#include "common/names.h"
+
+DEFINE_bool(insert_inherit_permissions, false, "If true, new directories created by "
+ "INSERTs will inherit the permissions of their parent directories");
+
+using namespace impala;
+using boost::algorithm::is_any_of;
+using boost::algorithm::split;
+
+string DmlExecState::OutputPartitionStats(const string& prefix) {
+ lock_guard<mutex> l(lock_);
+ const char* indent = " ";
+ stringstream ss;
+ ss << prefix;
+ bool first = true;
+ for (const PartitionStatusMap::value_type& val: per_partition_status_) {
+ if (!first) ss << endl;
+ first = false;
+ ss << "Partition: ";
+ const string& partition_key = val.first;
+ if (partition_key == g_ImpalaInternalService_constants.ROOT_PARTITION_KEY) {
+ ss << "Default" << endl;
+ } else {
+ ss << partition_key << endl;
+ }
+ if (val.second.__isset.num_modified_rows) {
+ ss << "NumModifiedRows: " << val.second.num_modified_rows << endl;
+ }
+
+ if (!val.second.__isset.stats) continue;
+ const TInsertStats& stats = val.second.stats;
+ if (stats.__isset.kudu_stats) {
+ ss << "NumRowErrors: " << stats.kudu_stats.num_row_errors << endl;
+ }
+
+ ss << indent << "BytesWritten: "
+ << PrettyPrinter::Print(stats.bytes_written, TUnit::BYTES);
+ if (stats.__isset.parquet_stats) {
+ const TParquetInsertStats& parquet_stats = stats.parquet_stats;
+ ss << endl << indent << "Per Column Sizes:";
+ for (map<string, int64_t>::const_iterator i = parquet_stats.per_column_size.begin();
+ i != parquet_stats.per_column_size.end(); ++i) {
+ ss << endl << indent << indent << i->first << ": "
+ << PrettyPrinter::Print(i->second, TUnit::BYTES);
+ }
+ }
+ }
+ return ss.str();
+}
+
+void DmlExecState::Update(const TInsertExecStatus& dml_exec_status) {
+ lock_guard<mutex> l(lock_);
+ for (const PartitionStatusMap::value_type& partition:
+ dml_exec_status.per_partition_status) {
+ TInsertPartitionStatus* status = &(per_partition_status_[partition.first]);
+ status->__set_num_modified_rows(
+ status->num_modified_rows + partition.second.num_modified_rows);
+ status->__set_kudu_latest_observed_ts(max<uint64_t>(
+ partition.second.kudu_latest_observed_ts, status->kudu_latest_observed_ts));
+ status->__set_id(partition.second.id);
+ status->__set_partition_base_dir(partition.second.partition_base_dir);
+
+ if (partition.second.__isset.stats) {
+ if (!status->__isset.stats) status->__set_stats(TInsertStats());
+ MergeDmlStats(partition.second.stats, &status->stats);
+ }
+ }
+ files_to_move_.insert(
+ dml_exec_status.files_to_move.begin(), dml_exec_status.files_to_move.end());
+}
+
+void DmlExecState::AddFileToMove(const string& file_name, const string& location) {
+ lock_guard<mutex> l(lock_);
+ files_to_move_[file_name] = location;
+}
+
+uint64_t DmlExecState::GetKuduLatestObservedTimestamp() {
+ lock_guard<mutex> l(lock_);
+ uint64_t max_ts = 0;
+ for (const auto& entry : per_partition_status_) {
+ max_ts = max<uint64_t>(max_ts, entry.second.kudu_latest_observed_ts);
+ }
+ return max_ts;
+}
+
+int64_t DmlExecState::GetNumModifiedRows() {
+ lock_guard<mutex> l(lock_);
+ int64_t result = 0;
+ for (const PartitionStatusMap::value_type& p: per_partition_status_) {
+ result += p.second.num_modified_rows;
+ }
+ return result;
+}
+
+bool DmlExecState::PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update) {
+ lock_guard<mutex> l(lock_);
+ for (const PartitionStatusMap::value_type& partition: per_partition_status_) {
+ catalog_update->created_partitions.insert(partition.first);
+ }
+ return catalog_update->created_partitions.size() != 0;
+}
+
+Status DmlExecState::FinalizeHdfsInsert(const TFinalizeParams& params,
+ bool s3_skip_insert_staging, HdfsTableDescriptor* hdfs_table,
+ RuntimeProfile* profile) {
+ lock_guard<mutex> l(lock_);
+ PermissionCache permissions_cache;
+ HdfsFsCache::HdfsFsMap filesystem_connection_cache;
+ HdfsOperationSet partition_create_ops(&filesystem_connection_cache);
+
+ // INSERT finalization happens in the five following steps
+ // 1. If OVERWRITE, remove all the files in the target directory
+ // 2. Create all the necessary partition directories.
+
+ // Loop over all partitions that were updated by this insert, and create the set of
+ // filesystem operations required to create the correct partition structure on disk.
+ for (const PartitionStatusMap::value_type& partition: per_partition_status_) {
+ SCOPED_TIMER(ADD_CHILD_TIMER(profile, "Overwrite/PartitionCreationTimer",
+ "FinalizationTimer"));
+ // INSERT allows writes to tables that have partitions on multiple filesystems.
+ // So we need to open connections to different filesystems as necessary. We use a
+ // local connection cache and populate it with one connection per filesystem that the
+ // partitions are on.
+ hdfsFS partition_fs_connection;
+ RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
+ partition.second.partition_base_dir, &partition_fs_connection,
+ &filesystem_connection_cache));
+
+ // Look up the partition in the descriptor table.
+ stringstream part_path_ss;
+ if (partition.second.id == -1) {
+ // If this is a non-existant partition, use the default partition location of
+ // <base_dir>/part_key_1=val/part_key_2=val/...
+ part_path_ss << params.hdfs_base_dir << "/" << partition.first;
+ } else {
+ HdfsPartitionDescriptor* part = hdfs_table->GetPartition(partition.second.id);
+ DCHECK(part != nullptr)
+ << "table_id=" << hdfs_table->id() << " partition_id=" << partition.second.id;
+ part_path_ss << part->location();
+ }
+ const string& part_path = part_path_ss.str();
+ bool is_s3_path = IsS3APath(part_path.c_str());
+
+ // If this is an overwrite insert, we will need to delete any updated partitions
+ if (params.is_overwrite) {
+ if (partition.first.empty()) {
+ // If the root directory is written to, then the table must not be partitioned
+ DCHECK(per_partition_status_.size() == 1);
+ // We need to be a little more careful, and only delete data files in the root
+ // because the tmp directories the sink(s) wrote are there also.
+ // So only delete files in the table directory - all files are treated as data
+ // files by Hive and Impala, but directories are ignored (and may legitimately
+ // be used to store permanent non-table data by other applications).
+ int num_files = 0;
+ // hfdsListDirectory() only sets errno if there is an error, but it doesn't set
+ // it to 0 if the call succeed. When there is no error, errno could be any
+ // value. So need to clear errno before calling it.
+ // Once HDFS-8407 is fixed, the errno reset won't be needed.
+ errno = 0;
+ hdfsFileInfo* existing_files =
+ hdfsListDirectory(partition_fs_connection, part_path.c_str(), &num_files);
+ if (existing_files == nullptr && errno == EAGAIN) {
+ errno = 0;
+ existing_files =
+ hdfsListDirectory(partition_fs_connection, part_path.c_str(), &num_files);
+ }
+ // hdfsListDirectory() returns nullptr not only when there is an error but also
+ // when the directory is empty(HDFS-8407). Need to check errno to make sure
+ // the call fails.
+ if (existing_files == nullptr && errno != 0) {
+ return Status(GetHdfsErrorMsg("Could not list directory: ", part_path));
+ }
+ for (int i = 0; i < num_files; ++i) {
+ const string filename =
+ boost::filesystem::path(existing_files[i].mName).filename().string();
+ if (existing_files[i].mKind == kObjectKindFile && !IsHiddenFile(filename)) {
+ partition_create_ops.Add(DELETE, existing_files[i].mName);
+ }
+ }
+ hdfsFreeFileInfo(existing_files, num_files);
+ } else {
+ // This is a partition directory, not the root directory; we can delete
+ // recursively with abandon, after checking that it ever existed.
+ // TODO: There's a potential race here between checking for the directory
+ // and a third-party deleting it.
+ if (FLAGS_insert_inherit_permissions && !is_s3_path) {
+ // There is no directory structure in S3, so "inheriting" permissions is not
+ // possible.
+ // TODO: Try to mimic inheriting permissions for S3.
+ PopulatePathPermissionCache(
+ partition_fs_connection, part_path, &permissions_cache);
+ }
+ // S3 doesn't have a directory structure, so we technically wouldn't need to
+ // CREATE_DIR on S3. However, libhdfs always checks if a path exists before
+ // carrying out an operation on that path. So we still need to call CREATE_DIR
+ // before we access that path due to this limitation.
+ if (hdfsExists(partition_fs_connection, part_path.c_str()) != -1) {
+ partition_create_ops.Add(DELETE_THEN_CREATE, part_path);
+ } else {
+ // Otherwise just create the directory.
+ partition_create_ops.Add(CREATE_DIR, part_path);
+ }
+ }
+ } else if (!is_s3_path || !s3_skip_insert_staging) {
+ // If the S3_SKIP_INSERT_STAGING query option is set, then the partition directories
+ // would have already been created by the table sinks.
+ if (FLAGS_insert_inherit_permissions && !is_s3_path) {
+ PopulatePathPermissionCache(
+ partition_fs_connection, part_path, &permissions_cache);
+ }
+ if (hdfsExists(partition_fs_connection, part_path.c_str()) == -1) {
+ partition_create_ops.Add(CREATE_DIR, part_path);
+ }
+ }
+ }
+
+ {
+ SCOPED_TIMER(ADD_CHILD_TIMER(profile, "Overwrite/PartitionCreationTimer",
+ "FinalizationTimer"));
+ if (!partition_create_ops.Execute(
+ ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
+ for (const HdfsOperationSet::Error& err: partition_create_ops.errors()) {
+ // It's ok to ignore errors creating the directories, since they may already
+ // exist. If there are permission errors, we'll run into them later.
+ if (err.first->op() != CREATE_DIR) {
+ return Status(Substitute(
+ "Error(s) deleting partition directories. First error (of $0) was: $1",
+ partition_create_ops.errors().size(), err.second));
+ }
+ }
+ }
+ }
+
+ // 3. Move all tmp files
+ HdfsOperationSet move_ops(&filesystem_connection_cache);
+ HdfsOperationSet dir_deletion_ops(&filesystem_connection_cache);
+
+ for (FileMoveMap::value_type& move: files_to_move_) {
+ // Empty destination means delete, so this is a directory. These get deleted in a
+ // separate pass to ensure that we have moved all the contents of the directory first.
+ if (move.second.empty()) {
+ VLOG_ROW << "Deleting file: " << move.first;
+ dir_deletion_ops.Add(DELETE, move.first);
+ } else {
+ VLOG_ROW << "Moving tmp file: " << move.first << " to " << move.second;
+ if (FilesystemsMatch(move.first.c_str(), move.second.c_str())) {
+ move_ops.Add(RENAME, move.first, move.second);
+ } else {
+ move_ops.Add(MOVE, move.first, move.second);
+ }
+ }
+ }
+
+ {
+ SCOPED_TIMER(ADD_CHILD_TIMER(profile, "FileMoveTimer", "FinalizationTimer"));
+ if (!move_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
+ stringstream ss;
+ ss << "Error(s) moving partition files. First error (of "
+ << move_ops.errors().size() << ") was: " << move_ops.errors()[0].second;
+ return Status(ss.str());
+ }
+ }
+
+ // 4. Delete temp directories
+ {
+ SCOPED_TIMER(ADD_CHILD_TIMER(profile, "FileDeletionTimer", "FinalizationTimer"));
+ if (!dir_deletion_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
+ stringstream ss;
+ ss << "Error(s) deleting staging directories. First error (of "
+ << dir_deletion_ops.errors().size() << ") was: "
+ << dir_deletion_ops.errors()[0].second;
+ return Status(ss.str());
+ }
+ }
+
+ // 5. Optionally update the permissions of the created partition directories
+ // Do this last so that we don't make a dir unwritable before we write to it.
+ if (FLAGS_insert_inherit_permissions) {
+ HdfsOperationSet chmod_ops(&filesystem_connection_cache);
+ for (const PermissionCache::value_type& perm: permissions_cache) {
+ bool new_dir = perm.second.first;
+ if (new_dir) {
+ short permissions = perm.second.second;
+ VLOG_QUERY << "INSERT created new directory: " << perm.first
+ << ", inherited permissions are: " << oct << permissions;
+ chmod_ops.Add(CHMOD, perm.first, permissions);
+ }
+ }
+ if (!chmod_ops.Execute(ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
+ stringstream ss;
+ ss << "Error(s) setting permissions on newly created partition directories. First"
+ << " error (of " << chmod_ops.errors().size() << ") was: "
+ << chmod_ops.errors()[0].second;
+ return Status(ss.str());
+ }
+ }
+ return Status::OK();
+}
+
+void DmlExecState::PopulatePathPermissionCache(hdfsFS fs, const string& path_str,
+ PermissionCache* permissions_cache) {
+ // Find out if the path begins with a hdfs:// -style prefix, and remove it and the
+ // location (e.g. host:port) if so.
+ int scheme_end = path_str.find("://");
+ string stripped_str;
+ if (scheme_end != string::npos) {
+ // Skip past the subsequent location:port/ prefix.
+ stripped_str = path_str.substr(path_str.find("/", scheme_end + 3));
+ } else {
+ stripped_str = path_str;
+ }
+
+ // Get the list of path components, used to build all path prefixes.
+ vector<string> components;
+ split(components, stripped_str, is_any_of("/"));
+
+ // Build a set of all prefixes (including the complete string) of stripped_path. So
+ // /a/b/c/d leads to a vector of: /a, /a/b, /a/b/c, /a/b/c/d
+ vector<string> prefixes;
+ // Stores the current prefix
+ stringstream accumulator;
+ for (const string& component: components) {
+ if (component.empty()) continue;
+ accumulator << "/" << component;
+ prefixes.push_back(accumulator.str());
+ }
+
+ // Now for each prefix, stat() it to see if a) it exists and b) if so what its
+ // permissions are. When we meet a directory that doesn't exist, we record the fact that
+ // we need to create it, and the permissions of its parent dir to inherit.
+ //
+ // Every prefix is recorded in the PermissionCache so we don't do more than one stat()
+ // for each path. If we need to create the directory, we record it as the pair (true,
+ // perms) so that the caller can identify which directories need their permissions
+ // explicitly set.
+
+ // Set to the permission of the immediate parent (i.e. the permissions to inherit if the
+ // current dir doesn't exist).
+ short permissions = 0;
+ for (const string& path: prefixes) {
+ PermissionCache::const_iterator it = permissions_cache->find(path);
+ if (it == permissions_cache->end()) {
+ hdfsFileInfo* info = hdfsGetPathInfo(fs, path.c_str());
+ if (info != nullptr) {
+ // File exists, so fill the cache with its current permissions.
+ permissions_cache->insert(
+ make_pair(path, make_pair(false, info->mPermissions)));
+ permissions = info->mPermissions;
+ hdfsFreeFileInfo(info, 1);
+ } else {
+ // File doesn't exist, so we need to set its permissions to its immediate parent
+ // once it's been created.
+ permissions_cache->insert(make_pair(path, make_pair(true, permissions)));
+ }
+ } else {
+ permissions = it->second.second;
+ }
+ }
+}
+
+bool DmlExecState::ToThrift(TInsertExecStatus* dml_status) {
+ lock_guard<mutex> l(lock_);
+ bool set_thrift = false;
+ if (files_to_move_.size() > 0) {
+ dml_status->__set_files_to_move(files_to_move_);
+ set_thrift = true;
+ }
+ if (per_partition_status_.size() > 0) {
+ dml_status->__set_per_partition_status(per_partition_status_);
+ set_thrift = true;
+ }
+ return set_thrift;
+}
+
+void DmlExecState::ToTInsertResult(TInsertResult* insert_result) {
+ lock_guard<mutex> l(lock_);
+ int64_t num_row_errors = 0;
+ bool has_kudu_stats = false;
+ for (const PartitionStatusMap::value_type& v: per_partition_status_) {
+ insert_result->rows_modified[v.first] = v.second.num_modified_rows;
+ if (v.second.__isset.stats && v.second.stats.__isset.kudu_stats) {
+ has_kudu_stats = true;
+ }
+ num_row_errors += v.second.stats.kudu_stats.num_row_errors;
+ }
+ if (has_kudu_stats) insert_result->__set_num_row_errors(num_row_errors);
+}
+
+void DmlExecState::AddPartition(
+ const string& name, int64_t id, const string* base_dir) {
+ lock_guard<mutex> l(lock_);
+ DCHECK(per_partition_status_.find(name) == per_partition_status_.end());
+ TInsertPartitionStatus status;
+ status.__set_num_modified_rows(0L);
+ status.__set_id(id);
+ status.__isset.stats = true;
+ if (base_dir != nullptr) status.__set_partition_base_dir(*base_dir);
+ per_partition_status_.insert(make_pair(name, status));
+}
+
+void DmlExecState::UpdatePartition(const string& partition_name,
+ int64_t num_modified_rows_delta, const TInsertStats* insert_stats) {
+ lock_guard<mutex> l(lock_);
+ PartitionStatusMap::iterator entry = per_partition_status_.find(partition_name);
+ DCHECK(entry != per_partition_status_.end());
+ entry->second.num_modified_rows += num_modified_rows_delta;
+ if (insert_stats == nullptr) return;
+ MergeDmlStats(*insert_stats, &entry->second.stats);
+}
+
+void DmlExecState::MergeDmlStats(const TInsertStats& src, TInsertStats* dst) {
+ dst->bytes_written += src.bytes_written;
+ if (src.__isset.kudu_stats) {
+ dst->__isset.kudu_stats = true;
+ if (!dst->kudu_stats.__isset.num_row_errors) {
+ dst->kudu_stats.__set_num_row_errors(0);
+ }
+ dst->kudu_stats.__set_num_row_errors(
+ dst->kudu_stats.num_row_errors + src.kudu_stats.num_row_errors);
+ }
+ if (src.__isset.parquet_stats) {
+ if (dst->__isset.parquet_stats) {
+ MergeMapValues<string, int64_t>(src.parquet_stats.per_column_size,
+ &dst->parquet_stats.per_column_size);
+ } else {
+ dst->__set_parquet_stats(src.parquet_stats);
+ }
+ }
+}
+
+void DmlExecState::InitForKuduDml() {
+ // For Kudu, track only one set of DML stats, so use the ROOT_PARTITION_KEY.
+ const string& partition_name = g_ImpalaInternalService_constants.ROOT_PARTITION_KEY;
+ lock_guard<mutex> l(lock_);
+ DCHECK(per_partition_status_.find(partition_name) == per_partition_status_.end());
+ TInsertPartitionStatus status;
+ status.__set_num_modified_rows(0L);
+ status.__set_id(-1L);
+ status.__isset.stats = true;
+ status.stats.__isset.kudu_stats = true;
+ per_partition_status_.insert(make_pair(partition_name, status));
+}
+
+void DmlExecState::SetKuduDmlStats(int64_t num_modified_rows, int64_t num_row_errors,
+ int64_t latest_ts) {
+ // For Kudu, track only one set of DML stats, so use the ROOT_PARTITION_KEY.
+ const string& partition_name = g_ImpalaInternalService_constants.ROOT_PARTITION_KEY;
+ lock_guard<mutex> l(lock_);
+ PartitionStatusMap::iterator entry = per_partition_status_.find(partition_name);
+ DCHECK(entry != per_partition_status_.end());
+ entry->second.__set_num_modified_rows(num_modified_rows);
+ entry->second.stats.kudu_stats.__set_num_row_errors(num_row_errors);
+ entry->second.__set_kudu_latest_observed_ts(latest_ts);
+}
http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/dml-exec-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/dml-exec-state.h b/be/src/runtime/dml-exec-state.h
new file mode 100644
index 0000000..728284a
--- /dev/null
+++ b/be/src/runtime/dml-exec-state.h
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef IMPALA_RUNTIME_DML_EXEC_STATE_H
+#define IMPALA_RUNTIME_DML_EXEC_STATE_H
+
+#include <string>
+#include <map>
+#include <boost/unordered_map.hpp>
+#include <boost/thread/mutex.hpp>
+
+#include "common/hdfs.h"
+#include "common/status.h"
+
+namespace impala {
+
+class TInsertExecStatus;
+class TInsertResult;
+class TInsertStats;
+class TFinalizeParams;
+class TUpdateCatalogRequest;
+class TInsertPartitionStatus;
+class RuntimeProfile;
+class HdfsTableDescriptor;
+
+/// DmlExecState manages the state related to the execution of a DML statement
+/// (creation of new files, new partitions, etc.).
+///
+/// During DML execution, the table sink adds per-partition status using AddPartition()
+/// and then UpdatePartition() for non-Kudu tables. For Kudu tables, the sink adds DML
+/// stats using InitForKuduDml() followed by SetKuduDmlStats(). In the case of the
+/// HDFS sink, it will also record the collection of files that should be moved by the
+/// coordinator on finalization using AddFileToMove().
+///
+/// The state is then serialized to thrift and merged at the coordinator using
+/// Update(). The coordinator will then use OutputPartitionStats(),
+/// GetKuduLatestObservedTimestamp(), PrepareCatalogUpdate() and FinalizeHdfsInsert()
+/// to perform various finalization tasks.
+///
+
+/// Thread-safe.
+class DmlExecState {
+ public:
+ /// Merge values from 'dml_exec_status'.
+ void Update(const TInsertExecStatus& dml_exec_status);
+
+ /// Add a new partition with the given parameters. Ignores 'base_dir' if nullptr.
+ /// It is an error to call this for an existing partition.
+ void AddPartition(const std::string& name, int64_t id, const std::string* base_dir);
+
+ /// Merge given values into stats for partition with name 'partition_name'.
+ /// Ignores 'insert_stats' if nullptr.
+ /// Requires that the partition already exist.
+ void UpdatePartition(const std::string& partition_name,
+ int64_t num_modified_rows_delta, const TInsertStats* insert_stats);
+
+ /// Used to initialize this state when execute Kudu DML. Must be called before
+ /// SetKuduDmlStats().
+ void InitForKuduDml();
+
+ /// Update stats for a Kudu DML sink. Requires that InitForKuduDml() was already called.
+ void SetKuduDmlStats(int64_t num_modified_rows, int64_t num_row_errors,
+ int64_t latest_ts);
+
+ /// Adds new file/location to the move map.
+ void AddFileToMove(const std::string& file_name, const std::string& location);
+
+ /// Outputs the partition stats to a string.
+ std::string OutputPartitionStats(const std::string& prefix);
+
+ /// Returns the latest Kudu timestamp observed across any backends where DML into Kudu
+ /// was executed, or 0 if there were no Kudu timestamps reported.
+ uint64_t GetKuduLatestObservedTimestamp();
+
+ /// Return the total number of modified rows across all partitions.
+ int64_t GetNumModifiedRows();
+
+ /// Populates 'catalog_update' with PartitionStatusMap data.
+ /// Returns true if a catalog update is required, false otherwise.
+ bool PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update);
+
+ /// For HDFS (and other Hadoop FileSystem) INSERT, moves all temporary staging files
+ /// to their final destinations, as indicated by 'params', and creates new partitions
+ /// for 'hdfs_table' as required. Adds child timers to profile for the various
+ /// stages of finalization. If the table is on an S3 path and
+ /// 's3_skip_insert_staging' is true, does not create new partition directories.
+ Status FinalizeHdfsInsert(const TFinalizeParams& params, bool s3_skip_insert_staging,
+ HdfsTableDescriptor* hdfs_table, RuntimeProfile* profile) WARN_UNUSED_RESULT;
+
+ // Serialize to thrift. Returns true if any fields of 'dml_status' were set.
+ bool ToThrift(TInsertExecStatus* dml_status);
+
+ // Populates 'insert_result' with PartitionStatusMap data, for Impala's extension of
+ // Beeswax.
+ void ToTInsertResult(TInsertResult* insert_result);
+
+ private:
+ // protects all fields below
+ boost::mutex lock_;
+
+ /// Counts how many rows an DML query has added to a particular partition (partitions
+ /// are identified by their partition keys: k1=v1/k2=v2 etc. Unpartitioned tables
+ /// have a single 'default' partition which is identified by ROOT_PARTITION_KEY.
+ /// Uses ordered map so that iteration order is deterministic.
+ typedef std::map<std::string, TInsertPartitionStatus> PartitionStatusMap;
+ PartitionStatusMap per_partition_status_;
+
+ /// Tracks files to move from a temporary (key) to a final destination (value) as
+ /// part of query finalization. If the destination is empty, the file is to be
+ /// deleted. Uses ordered map so that iteration order is deterministic.
+ typedef std::map<std::string, std::string> FileMoveMap;
+ FileMoveMap files_to_move_;
+
+ /// Determines what the permissions of directories created by INSERT statements should
+ /// be if permission inheritance is enabled. Populates a map from all prefixes of
+ /// 'path_str' (including the full path itself) which is a path in Hdfs, to pairs
+ /// (does_not_exist, permissions), where does_not_exist is true if the path does not
+ /// exist in Hdfs. If does_not_exist is true, permissions is set to the permissions of
+ /// the most immediate ancestor of the path that does exist, i.e. the permissions that
+ /// the path should inherit when created. Otherwise permissions is set to the actual
+ /// permissions of the path. The PermissionCache argument is also used to cache the
+ /// output across repeated calls, to avoid repeatedly calling hdfsGetPathInfo() on the
+ /// same path.
+ typedef boost::unordered_map<std::string, std::pair<bool, short>> PermissionCache;
+ void PopulatePathPermissionCache(hdfsFS fs, const std::string& path_str,
+ PermissionCache* permissions_cache);
+
+ /// Merge 'src' into 'dst'. Not thread-safe.
+ void MergeDmlStats(const TInsertStats& src, TInsertStats* dst);
+};
+
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 1cc646d..ad5748f 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -236,18 +236,9 @@ void QueryState::ReportExecStatusAux(bool done, const Status& status,
// Only send updates to insert status if fragment is finished, the coordinator waits
// until query execution is done to use them anyhow.
RuntimeState* state = fis->runtime_state();
- if (done && (state->hdfs_files_to_move()->size() > 0
- || state->per_partition_status()->size() > 0)) {
- TInsertExecStatus insert_status;
- if (state->hdfs_files_to_move()->size() > 0) {
- insert_status.__set_files_to_move(*state->hdfs_files_to_move());
- }
- if (state->per_partition_status()->size() > 0) {
- insert_status.__set_per_partition_status(*state->per_partition_status());
- }
- params.__set_insert_exec_status(insert_status);
+ if (done && state->dml_exec_state()->ToThrift(¶ms.insert_exec_status)) {
+ params.__isset.insert_exec_status = true;
}
-
// Send new errors to coordinator
state->GetUnreportedErrors(¶ms.error_log);
params.__isset.error_log = (params.error_log.size() > 0);
http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/runtime-filter-bank.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc
index 239e066..4e23a42 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -27,6 +27,7 @@
#include "runtime/mem-tracker.h"
#include "runtime/query-state.h"
#include "runtime/runtime-filter.inline.h"
+#include "runtime/runtime-state.h"
#include "service/impala-server.h"
#include "util/bit-util.h"
#include "util/bloom-filter.h"
http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index cd2b061..4b005b2 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -27,6 +27,7 @@
#include "common/global-types.h" // for PlanNodeId
#include "runtime/client-cache-types.h"
#include "runtime/thread-resource-mgr.h"
+#include "runtime/dml-exec-state.h"
#include "util/runtime-profile.h"
#include "gen-cpp/ImpalaInternalService_types.h"
@@ -56,22 +57,6 @@ namespace io {
class DiskIoMgr;
}
-/// TODO: move the typedefs into a separate .h (and fix the includes for that)
-
-/// Counts how many rows an INSERT query has added to a particular partition
-/// (partitions are identified by their partition keys: k1=v1/k2=v2
-/// etc. Unpartitioned tables have a single 'default' partition which is
-/// identified by ROOT_PARTITION_KEY.
-typedef std::map<std::string, TInsertPartitionStatus> PartitionStatusMap;
-
-/// Stats per partition for insert queries. They key is the same as for PartitionRowCount
-typedef std::map<std::string, TInsertStats> PartitionInsertStats;
-
-/// Tracks files to move from a temporary (key) to a final destination (value) as
-/// part of query finalization. If the destination is empty, the file is to be
-/// deleted.
-typedef std::map<std::string, std::string> FileMoveMap;
-
/// A collection of items that are part of the global state of a query and shared across
/// all execution nodes of that query. After initialisation, callers must call
/// ReleaseResources() to ensure that all resources are correctly freed before
@@ -133,8 +118,6 @@ class RuntimeState {
}
ThreadResourceMgr::ResourcePool* resource_pool() { return resource_pool_; }
- FileMoveMap* hdfs_files_to_move() { return &hdfs_files_to_move_; }
-
void set_fragment_root_id(PlanNodeId id) {
DCHECK_EQ(root_node_id_, -1) << "Should not set this twice.";
root_node_id_ = id;
@@ -146,7 +129,7 @@ class RuntimeState {
RuntimeFilterBank* filter_bank() { return filter_bank_.get(); }
- PartitionStatusMap* per_partition_status() { return &per_partition_status_; }
+ DmlExecState* dml_exec_state() { return &dml_exec_state_; }
/// Returns runtime state profile
RuntimeProfile* runtime_profile() { return profile_; }
@@ -341,12 +324,8 @@ class RuntimeState {
/// state is responsible for returning this pool to the thread mgr.
ThreadResourceMgr::ResourcePool* resource_pool_ = nullptr;
- /// Temporary Hdfs files created, and where they should be moved to ultimately.
- /// Mapping a filename to a blank destination causes it to be deleted.
- FileMoveMap hdfs_files_to_move_;
-
- /// Records summary statistics for the results of inserts into Hdfs partitions.
- PartitionStatusMap per_partition_status_;
+ /// Execution state for DML statements.
+ DmlExecState dml_exec_state_;
RuntimeProfile* const profile_;
http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index ac6178c..2aedcab 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -21,6 +21,7 @@
#include <limits>
#include <gutil/strings/substitute.h>
+#include "runtime/coordinator.h"
#include "runtime/mem-tracker.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
@@ -570,7 +571,8 @@ void ClientRequestState::Done() {
// must happen before taking lock_ below.
if (coord_.get() != NULL) {
// This is safe to access on coord_ after Wait() has been called.
- uint64_t latest_kudu_ts = coord_->GetLatestKuduInsertTimestamp();
+ uint64_t latest_kudu_ts =
+ coord_->dml_exec_state()->GetKuduLatestObservedTimestamp();
if (latest_kudu_ts > 0) {
VLOG_RPC << "Updating session (id=" << session_id() << ") with latest "
<< "observed Kudu timestamp: " << latest_kudu_ts;
@@ -918,7 +920,7 @@ Status ClientRequestState::UpdateCatalog() {
catalog_update.__set_sync_ddl(exec_request().query_options.sync_ddl);
catalog_update.__set_header(TCatalogServiceRequestHeader());
catalog_update.header.__set_requesting_user(effective_user());
- if (!coord()->PrepareCatalogUpdate(&catalog_update)) {
+ if (!coord()->dml_exec_state()->PrepareCatalogUpdate(&catalog_update)) {
VLOG_QUERY << "No partitions altered, not updating metastore (query id: "
<< query_id() << ")";
} else {
@@ -1027,9 +1029,7 @@ void ClientRequestState::SetCreateTableAsSelectResultSet() {
// operation.
if (catalog_op_executor_->ddl_exec_response()->new_table_created) {
DCHECK(coord_.get());
- for (const PartitionStatusMap::value_type& p: coord_->per_partition_status()) {
- total_num_rows_inserted += p.second.num_modified_rows;
- }
+ total_num_rows_inserted = coord_->dml_exec_state()->GetNumModifiedRows();
}
const string& summary_msg = Substitute("Inserted $0 row(s)", total_num_rows_inserted);
VLOG_QUERY << summary_msg;
http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/service/client-request-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index 0c05bc4..657f3de 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -24,6 +24,7 @@
#include "scheduling/query-schedule.h"
#include "service/child-query.h"
#include "service/impala-server.h"
+#include "service/query-result-set.h"
#include "util/auth-util.h"
#include "util/condition-variable.h"
#include "util/runtime-profile.h"
http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/service/impala-beeswax-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index b827ff3..c441285 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -20,10 +20,12 @@
#include "common/logging.h"
#include "gen-cpp/Frontend_types.h"
#include "rpc/thrift-util.h"
+#include "runtime/coordinator.h"
#include "runtime/exec-env.h"
#include "runtime/raw-value.inline.h"
#include "runtime/timestamp-value.h"
#include "service/client-request-state.h"
+#include "service/frontend.h"
#include "service/query-options.h"
#include "service/query-result-set.h"
#include "util/auth-util.h"
@@ -563,22 +565,8 @@ Status ImpalaServer::CloseInsertInternal(const TUniqueId& query_id,
// Note that when IMPALA-87 is fixed (INSERT without FROM clause) we might
// need to revisit this, since that might lead us to insert a row without a
// coordinator, depending on how we choose to drive the table sink.
- int64_t num_row_errors = 0;
- bool has_kudu_stats = false;
if (request_state->coord() != nullptr) {
- for (const PartitionStatusMap::value_type& v:
- request_state->coord()->per_partition_status()) {
- const pair<string, TInsertPartitionStatus> partition_status = v;
- insert_result->rows_modified[partition_status.first] =
- partition_status.second.num_modified_rows;
-
- if (partition_status.second.__isset.stats &&
- partition_status.second.stats.__isset.kudu_stats) {
- has_kudu_stats = true;
- }
- num_row_errors += partition_status.second.stats.kudu_stats.num_row_errors;
- }
- if (has_kudu_stats) insert_result->__set_num_row_errors(num_row_errors);
+ request_state->coord()->dml_exec_state()->ToTInsertResult(insert_result);
}
}
}
http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/service/impala-hs2-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index 4381f81..80ace87 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -34,6 +34,7 @@
#include "common/logging.h"
#include "common/version.h"
#include "rpc/thrift-util.h"
+#include "runtime/coordinator.h"
#include "runtime/raw-value.h"
#include "runtime/exec-env.h"
#include "service/hs2-util.h"
http://git-wip-us.apache.org/repos/asf/impala/blob/408ee4d6/be/src/service/impala-http-handler.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc
index 43b03d1..3841bfe 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -24,6 +24,7 @@
#include "catalog/catalog-util.h"
#include "gen-cpp/beeswax_types.h"
+#include "runtime/coordinator.h"
#include "runtime/exec-env.h"
#include "runtime/mem-tracker.h"
#include "runtime/query-state.h"
@@ -31,6 +32,7 @@
#include "runtime/timestamp-value.inline.h"
#include "service/impala-server.h"
#include "service/client-request-state.h"
+#include "service/frontend.h"
#include "thrift/protocol/TDebugProtocol.h"
#include "util/coding-util.h"
#include "util/logging-support.h"