You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2016/12/02 02:16:39 UTC

[1/6] incubator-impala git commit: IMPALA-4562: Fix for crash on kerberized clusters w/o Kudu support

Repository: incubator-impala
Updated Branches:
  refs/heads/master 83f777da7 -> da34ce978


IMPALA-4562: Fix for crash on kerberized clusters w/o Kudu support

commit de88f0c4af3a07ae6bd6b8c94edcb8748468f522 for
"IMPALA-4497: Fix Kudu client crash w/ SASL initialization"
causes a crash on secure clusters where kudu is not
supported.

   kudu::client::DisableSaslInitialization() from libkudu_client.so.0
   impala::InitAuth(std::string const&) ()
   impala::InitCommonRuntime() ()
   ImpaladMain(int, char**) ()
   main ()

This ensures Impala does not call the Kudu client to handle
SASL init on systems where libkudu_client.so is a stub.

Change-Id: Ib517d17ab12e215fe87f35bc5d03cdda736ff672
Reviewed-on: http://gerrit.cloudera.org:8080/5295
Reviewed-by: Henry Robinson <he...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/1b8fede3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/1b8fede3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/1b8fede3

Branch: refs/heads/master
Commit: 1b8fede3b5b3f5aff9d3fb3ac74f956da870186d
Parents: 83f777d
Author: Matthew Jacobs <mj...@cloudera.com>
Authored: Wed Nov 30 16:25:51 2016 -0800
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Thu Dec 1 21:06:49 2016 +0000

----------------------------------------------------------------------
 be/src/rpc/authentication.cc | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1b8fede3/be/src/rpc/authentication.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/authentication.cc b/be/src/rpc/authentication.cc
index cf6432a..db0e608 100644
--- a/be/src/rpc/authentication.cc
+++ b/be/src/rpc/authentication.cc
@@ -643,8 +643,10 @@ Status InitAuth(const string& appname) {
     // Impala's SASL initialization. This must be called before any KuduClients are
     // created to ensure that Kudu doesn't init SASL first, and this returns an error if
     // Kudu has already initialized SASL.
-    KUDU_RETURN_IF_ERROR(kudu::client::DisableSaslInitialization(),
-        "Unable to disable Kudu SASL initialization.");
+    if (impala::KuduIsAvailable()) {
+      KUDU_RETURN_IF_ERROR(kudu::client::DisableSaslInitialization(),
+          "Unable to disable Kudu SASL initialization.");
+    }
 
     // Add our auxprop plugin, which gives us a hook before authentication
     int rc = sasl_auxprop_add_plugin(IMPALA_AUXPROP_PLUGIN.c_str(), &ImpalaAuxpropInit);


[3/6] incubator-impala git commit: Fix E2E test infrastructure to handle missing exceptions correctly

Posted by kw...@apache.org.
Fix E2E test infrastructure to handle missing exceptions correctly

This change fixes a bug in the E2E infrastructure that handles
the case when an expected exception wasn't thrown. The code was
expecting that test_section['CATCH'] to be a string but in
reality it's a list of strings. It also clarifies the error
message about the missing exception. This change also enforces
that the CATCH subsection in tests cannot be empty.

Change-Id: I7d83c5db59e8a239e4e70694a1e625af6f21419c
Reviewed-on: http://gerrit.cloudera.org:8080/5260
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/a41918d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/a41918d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/a41918d4

Branch: refs/heads/master
Commit: a41918d443217d5e3205f6e9987d0c857f7ef7ef
Parents: f3fe2cf
Author: Michael Ho <kw...@cloudera.com>
Authored: Mon Nov 28 15:40:16 2016 -0800
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Thu Dec 1 23:43:03 2016 +0000

----------------------------------------------------------------------
 tests/common/impala_test_suite.py | 6 +++++-
 tests/util/test_file_parser.py    | 2 ++
 2 files changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a41918d4/tests/common/impala_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index 222f709..38849db 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -329,7 +329,11 @@ class ImpalaTestSuite(BaseTestSuite):
           self.__restore_query_options(query_options_changed, target_impalad_client)
 
       if 'CATCH' in test_section:
-        assert test_section['CATCH'].strip() == ''
+        expected_str = " or ".join(test_section['CATCH']).strip() \
+          .replace('$FILESYSTEM_PREFIX', FILESYSTEM_PREFIX) \
+          .replace('$NAMENODE', NAMENODE) \
+          .replace('$IMPALA_HOME', IMPALA_HOME)
+        assert False, "Expected exception: %s" % expected_str
 
       assert result is not None
       assert result.success

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a41918d4/tests/util/test_file_parser.py
----------------------------------------------------------------------
diff --git a/tests/util/test_file_parser.py b/tests/util/test_file_parser.py
index 886d8b7..9850b18 100644
--- a/tests/util/test_file_parser.py
+++ b/tests/util/test_file_parser.py
@@ -225,6 +225,8 @@ def parse_test_file_text(text, valid_section_names, skip_unknown_sections=True):
           parsed_sections['CATCH'].extend(lines_content)
         else:
           raise RuntimeError, 'Unknown subsection comment: %s' % subsection_comment
+        for exception_str in parsed_sections['CATCH']:
+          assert exception_str.strip(), "Empty exception string."
         continue
 
       # The DML_RESULTS section is used to specify what the state of the table should be


[5/6] incubator-impala git commit: IMPALA-4564, IMPALA-4565: mt_dop fixes for old aggs and joins

Posted by kw...@apache.org.
IMPALA-4564,IMPALA-4565: mt_dop fixes for old aggs and joins

Fix a test bug where we need to skip nested types tests for the old aggs
and joins.

Fix a product bug where *eos is not initialised by the MT scan node.
This causes incorrect results when the calling ExecNode does not
initialise the eos variable, e.g. the sort node and the old agg and join
nodes.

Testing:
Added a test that reproduces the incorrect results with the sort node
when run under ASAN

Tested the mt_dop tests locally with old aggs and joins to ensure they
pass.

Change-Id: I48c50c8aa0c23710eb099fba252bc3c0cb74b313
Reviewed-on: http://gerrit.cloudera.org:8080/5302
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b3740612
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b3740612
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b3740612

Branch: refs/heads/master
Commit: b3740612065d742f7ddb96154da64c590867759e
Parents: 56f4d0f
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Nov 30 21:53:27 2016 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Dec 2 01:46:55 2016 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-scan-node-mt.cc                |  1 +
 .../QueryTest/mt-dop-parquet-nested.test        | 34 ++++++++++++++++++++
 .../queries/QueryTest/mt-dop-parquet.test       | 33 -------------------
 .../queries/QueryTest/mt-dop.test               | 19 +++++++++++
 tests/query_test/test_mt_dop.py                 |  6 ++++
 5 files changed, 60 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b3740612/be/src/exec/hdfs-scan-node-mt.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
index bde9f81..50936b8 100644
--- a/be/src/exec/hdfs-scan-node-mt.cc
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -67,6 +67,7 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e
   RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
+  *eos = false;
 
   DCHECK(scan_range_ == NULL || scanner_ != NULL);
   if (scan_range_ == NULL || scanner_->eos()) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b3740612/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet-nested.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet-nested.test b/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet-nested.test
new file mode 100644
index 0000000..9be983d
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet-nested.test
@@ -0,0 +1,34 @@
+====
+---- QUERY
+# IMPALA-4554: Memory corruption of nested collection with MT_DOP > 0.
+select id, cnt
+from functional_parquet.complextypestbl t,
+  (select count(item) cnt from t.int_array) v
+order by id
+limit 10
+---- RESULTS
+1,3
+2,3
+3,0
+4,0
+5,0
+6,0
+7,0
+8,1
+---- TYPES
+bigint,bigint
+====
+---- QUERY
+# IMPALA-4458: Test proper resource cleanup for cancelled fragments.
+# This test is duplicated from nested-types-subplan.test
+select c_custkey, c_mktsegment, o_orderkey, o_orderdate
+from tpch_nested_parquet.customer c, c.c_orders o
+where c_custkey = 1
+limit 3
+---- RESULTS
+1,regex:.*,regex:.*,regex:.*
+1,regex:.*,regex:.*,regex:.*
+1,regex:.*,regex:.*,regex:.*
+---- TYPES
+bigint,string,bigint,string
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b3740612/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet.test b/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet.test
index 0523f1d..39ec4b3 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet.test
@@ -5,36 +5,3 @@ select * from functional_parquet.bad_metadata_len
 ---- CATCH
 Invalid metadata size in file footer
 ====
----- QUERY
-# IMPALA-4554: Memory corruption of nested collection with MT_DOP > 0.
-select id, cnt
-from functional_parquet.complextypestbl t,
-  (select count(item) cnt from t.int_array) v
-order by id
-limit 10
----- RESULTS
-1,3
-2,3
-3,0
-4,0
-5,0
-6,0
-7,0
-8,1
----- TYPES
-bigint,bigint
-====
----- QUERY
-# IMPALA-4458: Test proper resource cleanup for cancelled fragments.
-# This test is duplicated from nested-types-subplan.test
-select c_custkey, c_mktsegment, o_orderkey, o_orderdate
-from tpch_nested_parquet.customer c, c.c_orders o
-where c_custkey = 1
-limit 3
----- RESULTS
-1,regex:.*,regex:.*,regex:.*
-1,regex:.*,regex:.*,regex:.*
-1,regex:.*,regex:.*,regex:.*
----- TYPES
-bigint,string,bigint,string
-====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b3740612/testdata/workloads/functional-query/queries/QueryTest/mt-dop.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/mt-dop.test b/testdata/workloads/functional-query/queries/QueryTest/mt-dop.test
index ac453ca..a46693a 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/mt-dop.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/mt-dop.test
@@ -7,3 +7,22 @@ select count(*) from alltypes
 ---- TYPES
 BIGINT
 ====
+---- QUERY
+# IMPALA-4565: incorrect results because mt scan node does not set eos
+# correctly and sort node only gets the first row batch.
+set batch_size=1;
+select id
+from alltypestiny
+order by id
+---- TYPES
+int
+---- RESULTS
+0
+1
+2
+3
+4
+5
+6
+7
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b3740612/tests/query_test/test_mt_dop.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_mt_dop.py b/tests/query_test/test_mt_dop.py
index d05d6b4..ff60b60 100644
--- a/tests/query_test/test_mt_dop.py
+++ b/tests/query_test/test_mt_dop.py
@@ -21,6 +21,7 @@ import pytest
 
 from copy import deepcopy
 from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.skip import SkipIfOldAggsJoins
 from tests.common.test_vector import TestDimension
 from tests.common.test_vector import TestVector
 
@@ -88,3 +89,8 @@ class TestMtDopParquet(ImpalaTestSuite):
   def test_parquet(self, vector):
     vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop')
     self.run_test_case('QueryTest/mt-dop-parquet', vector)
+
+  @SkipIfOldAggsJoins.nested_types
+  def test_parquet_nested(self, vector):
+    vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop')
+    self.run_test_case('QueryTest/mt-dop-parquet-nested', vector)


[4/6] incubator-impala git commit: IMPALA-4504: fix races in PlanFragmentExecutor regarding status reporting

Posted by kw...@apache.org.
IMPALA-4504: fix races in PlanFragmentExecutor regarding status reporting

The PlanFragmentExecutor has some state shared between the main
execution thread and the periodic reporting thread that isn't
synchronized properly.  IMPALA-4504 describes one such problem, and that
bug was introduced in an attempt to fix another similar race.

Let's just simplify all of this and remove this shared state.  Instead,
the profile thread will always be responsible for sending periodic
'!done' messages, and the main execution thread will always be
responsible for sending the final 'done' message (after joining the
periodic thread).

This will allow for even more simplification, in particular the
interaction between FragementExecState and PlanFragementExecutor, but
I'm not doing that now as to avoid more conflicts with the MT work.

Change-Id: I052b7b4fabb341ad27ad294cd5b0a53728d87d0e
Reviewed-on: http://gerrit.cloudera.org:8080/5250
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/56f4d0f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/56f4d0f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/56f4d0f5

Branch: refs/heads/master
Commit: 56f4d0f592f129054985786287ad8ed8c241fc74
Parents: a41918d
Author: Dan Hecht <dh...@cloudera.com>
Authored: Mon Nov 28 15:38:12 2016 -0800
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Fri Dec 2 00:35:02 2016 +0000

----------------------------------------------------------------------
 be/src/runtime/plan-fragment-executor.cc | 101 +++++++++-----------------
 be/src/runtime/plan-fragment-executor.h  |  65 +++++++----------
 be/src/service/fragment-exec-state.cc    |  18 ++---
 be/src/service/fragment-exec-state.h     |   6 +-
 4 files changed, 68 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/56f4d0f5/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc
index cfa4863..07dcfb0 100644
--- a/be/src/runtime/plan-fragment-executor.cc
+++ b/be/src/runtime/plan-fragment-executor.cc
@@ -76,8 +76,12 @@ PlanFragmentExecutor::PlanFragmentExecutor(
     report_thread_active_(false),
     closed_(false),
     has_thread_token_(false),
+    timings_profile_(NULL),
+    root_sink_(NULL),
     is_prepared_(false),
     is_cancelled_(false),
+    per_host_mem_usage_(NULL),
+    rows_produced_counter_(NULL),
     average_thread_tokens_(NULL),
     mem_usage_sampled_counter_(NULL),
     thread_usage_sampled_counter_(NULL) {}
@@ -91,6 +95,7 @@ PlanFragmentExecutor::~PlanFragmentExecutor() {
 Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) {
   Status status = PrepareInternal(request);
   prepared_promise_.Set(status);
+  if (!status.ok()) FragmentComplete(status);
   return status;
 }
 
@@ -287,11 +292,12 @@ void PlanFragmentExecutor::PrintVolumeIds(
 }
 
 Status PlanFragmentExecutor::Open() {
+  DCHECK(prepared_promise_.IsSet() && prepared_promise_.Get().ok());
   SCOPED_TIMER(profile()->total_time_counter());
   SCOPED_TIMER(ADD_TIMER(timings_profile_, OPEN_TIMER_NAME));
   VLOG_QUERY << "Open(): instance_id=" << runtime_state_->fragment_instance_id();
   Status status = OpenInternal();
-  UpdateStatus(status);
+  if (!status.ok()) FragmentComplete(status);
   opened_promise_.Set(status);
   return status;
 }
@@ -301,17 +307,16 @@ Status PlanFragmentExecutor::OpenInternal() {
   RETURN_IF_ERROR(
       runtime_state_->desc_tbl().PrepareAndOpenPartitionExprs(runtime_state_.get()));
 
-  // we need to start the profile-reporting thread before calling exec_tree_->Open(),
-  // since it
-  // may block
+  // We need to start the profile-reporting thread before calling exec_tree_->Open(),
+  // since it may block.
   if (!report_status_cb_.empty() && FLAGS_status_report_interval > 0) {
     unique_lock<mutex> l(report_thread_lock_);
     report_thread_.reset(
         new Thread("plan-fragment-executor", "report-profile",
-            &PlanFragmentExecutor::ReportProfile, this));
-    // make sure the thread started up, otherwise ReportProfile() might get into a race
-    // with StopReportThread()
-    report_thread_started_cv_.wait(l);
+            &PlanFragmentExecutor::ReportProfileThread, this));
+    // Make sure the thread started up, otherwise ReportProfileThread() might get into
+    // a race with StopReportThread().
+    while (!report_thread_active_) report_thread_started_cv_.wait(l);
   }
 
   RETURN_IF_ERROR(OptimizeLlvmModule());
@@ -324,29 +329,22 @@ Status PlanFragmentExecutor::OpenInternal() {
 }
 
 Status PlanFragmentExecutor::Exec() {
+  DCHECK(opened_promise_.IsSet() && opened_promise_.Get().ok());
   SCOPED_TIMER(profile()->total_time_counter());
   Status status;
   {
     // Must go out of scope before FragmentComplete(), otherwise counter will not be
     // updated by time final profile is sent, and will always be 0.
     SCOPED_TIMER(ADD_TIMER(timings_profile_, EXEC_TIMER_NAME));
-    {
-      lock_guard<mutex> l(status_lock_);
-      RETURN_IF_ERROR(status_);
-    }
     status = ExecInternal();
   }
-
-  // If there's no error, ExecInternal() completed the fragment instance's execution.
-  if (status.ok()) {
-    FragmentComplete();
-  } else if (!status.IsCancelled() && !status.IsMemLimitExceeded()) {
-    // Log error message in addition to returning in Status. Queries that do not
-    // fetch results (e.g. insert) may not receive the message directly and can
-    // only retrieve the log.
+  if (!status.ok() && !status.IsCancelled() && !status.IsMemLimitExceeded()) {
+    // Log error message in addition to returning in Status. Queries that do not fetch
+    // results (e.g. insert) may not receive the message directly and can only retrieve
+    // the log.
     runtime_state_->LogError(status.msg());
   }
-  UpdateStatus(status);
+  FragmentComplete(status);
   return status;
 }
 
@@ -376,8 +374,9 @@ Status PlanFragmentExecutor::ExecInternal() {
   return Status::OK();
 }
 
-void PlanFragmentExecutor::ReportProfile() {
-  VLOG_FILE << "ReportProfile(): instance_id=" << runtime_state_->fragment_instance_id();
+void PlanFragmentExecutor::ReportProfileThread() {
+  VLOG_FILE << "ReportProfileThread(): instance_id="
+            << runtime_state_->fragment_instance_id();
   DCHECK(!report_status_cb_.empty());
   unique_lock<mutex> l(report_thread_lock_);
   // tell Open() that we started
@@ -414,40 +413,27 @@ void PlanFragmentExecutor::ReportProfile() {
     }
 
     if (!report_thread_active_) break;
-
-    if (completed_report_sent_.Load() == 0) {
-      // No complete fragment report has been sent.
-      SendReport(false);
-    }
+    SendReport(false, Status::OK());
   }
 
   VLOG_FILE << "exiting reporting thread: instance_id="
       << runtime_state_->fragment_instance_id();
 }
 
-void PlanFragmentExecutor::SendReport(bool done) {
+void PlanFragmentExecutor::SendReport(bool done, const Status& status) {
+  DCHECK(status.ok() || done);
   if (report_status_cb_.empty()) return;
 
-  Status status;
-  {
-    lock_guard<mutex> l(status_lock_);
-    status = status_;
-  }
-
-  // If status is not OK, we need to make sure that only one sender sends a 'done'
-  // response.
-  // TODO: Clean all this up - move 'done' reporting to Close()?
-  if (!done && !status.ok()) {
-    done = completed_report_sent_.CompareAndSwap(0, 1);
-  }
-
   // Update the counter for the peak per host mem usage.
-  per_host_mem_usage_->Set(runtime_state()->query_mem_tracker()->peak_consumption());
+  if (per_host_mem_usage_ != nullptr) {
+    per_host_mem_usage_->Set(runtime_state()->query_mem_tracker()->peak_consumption());
+  }
 
   // This will send a report even if we are cancelled.  If the query completed correctly
   // but fragments still need to be cancelled (e.g. limit reached), the coordinator will
   // be waiting for a final report and profile.
-  report_status_cb_(status, profile(), done);
+  RuntimeProfile* prof = is_prepared_ ? profile() : nullptr;
+  report_status_cb_(status, prof, done);
 }
 
 void PlanFragmentExecutor::StopReportThread() {
@@ -460,29 +446,11 @@ void PlanFragmentExecutor::StopReportThread() {
   report_thread_->Join();
 }
 
-void PlanFragmentExecutor::FragmentComplete() {
-  // Check the atomic flag. If it is set, then a fragment complete report has already
-  // been sent.
-  bool send_report = completed_report_sent_.CompareAndSwap(0, 1);
+void PlanFragmentExecutor::FragmentComplete(const Status& status) {
   ReleaseThreadToken();
   StopReportThread();
-  if (send_report) SendReport(true);
-}
-
-void PlanFragmentExecutor::UpdateStatus(const Status& status) {
-  if (status.ok()) return;
-
-  bool send_report = completed_report_sent_.CompareAndSwap(0, 1);
-
-  {
-    lock_guard<mutex> l(status_lock_);
-    if (status_.ok()) {
-      status_ = status;
-    }
-  }
-
-  StopReportThread();
-  if (send_report) SendReport(true);
+  // It's safe to send final report now that the reporting thread is stopped.
+  SendReport(true, status);
 }
 
 void PlanFragmentExecutor::Cancel() {
@@ -520,6 +488,9 @@ void PlanFragmentExecutor::ReleaseThreadToken() {
 }
 
 void PlanFragmentExecutor::Close() {
+  DCHECK(!has_thread_token_);
+  DCHECK(!report_thread_active_);
+
   if (closed_) return;
   if (!is_prepared_) return;
   if (sink_.get() != nullptr) sink_->Close(runtime_state());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/56f4d0f5/be/src/runtime/plan-fragment-executor.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.h b/be/src/runtime/plan-fragment-executor.h
index 22f2653..f93dab4 100644
--- a/be/src/runtime/plan-fragment-executor.h
+++ b/be/src/runtime/plan-fragment-executor.h
@@ -62,14 +62,12 @@ class TPlanExecParams;
 /// includes profile information for the plan itself as well as the output sink.
 ///
 /// The ReportStatusCallback passed into the c'tor is invoked periodically to report the
-/// execution status. The frequency of those reports is controlled by the flag
+/// execution profile. The frequency of those reports is controlled by the flag
 /// status_report_interval; setting that flag to 0 disables periodic reporting altogether
 /// Regardless of the value of that flag, if a report callback is specified, it is invoked
 /// at least once at the end of execution with an overall status and profile (and 'done'
-/// indicator). The only exception is when execution is cancelled, in which case the
-/// callback is *not* invoked (the coordinator already knows that execution stopped,
-/// because it initiated the cancellation).
-//
+/// indicator).
+///
 /// Aside from Cancel(), which may be called asynchronously, this class is not
 /// thread-safe.
 class PlanFragmentExecutor {
@@ -103,14 +101,19 @@ class PlanFragmentExecutor {
   ///
   /// If Cancel() is called before Prepare(), Prepare() is a no-op and returns
   /// Status::CANCELLED;
+  ///
+  /// If Prepare() fails, it will invoke final status callback with the error status.
   Status Prepare(const TExecPlanFragmentParams& request);
 
-  /// Opens the fragment plan and sink. Starts the profile reporting thread, if required.
+  /// Opens the fragment plan and sink. Starts the profile reporting thread, if
+  /// required.  Can be called only if Prepare() succeeded. If Open() fails it will
+  /// invoke the final status callback with the error status.
   Status Open();
 
   /// Executes the fragment by repeatedly driving the sink with batches produced by the
   /// exec node tree. report_status_cb will have been called for the final time when
-  /// Exec() returns, and the status-reporting thread will have been stopped.
+  /// Exec() returns, and the status-reporting thread will have been stopped. Can be
+  /// called only if Open() succeeded.
   Status Exec();
 
   /// Closes the underlying plan fragment and frees up all resources allocated in
@@ -169,7 +172,8 @@ class PlanFragmentExecutor {
 
   /// When the report thread starts, it sets 'report_thread_active_' to true and signals
   /// 'report_thread_started_cv_'. The report thread is shut down by setting
-  /// 'report_thread_active_' to false and signalling 'stop_report_thread_cv_'.
+  /// 'report_thread_active_' to false and signalling 'stop_report_thread_cv_'. Protected
+  /// by 'report_thread_lock_'.
   bool report_thread_active_;
 
   /// true if Close() has been called
@@ -178,16 +182,6 @@ class PlanFragmentExecutor {
   /// true if this fragment has not returned the thread token to the thread resource mgr
   bool has_thread_token_;
 
-  /// Overall execution status. Either ok() or set to the first error status that
-  /// was encountered.
-  Status status_;
-
-  /// Protects status_
-  /// lock ordering:
-  /// 1. report_thread_lock_
-  /// 2. status_lock_
-  boost::mutex status_lock_;
-
   /// 'runtime_state_' has to be before 'sink_' as 'sink_' relies on the object pool of
   /// 'runtime_state_'. This means 'sink_' is destroyed first so any implicit connections
   /// (e.g. mem_trackers_) from 'runtime_state_' to 'sink_' need to be severed prior to
@@ -242,12 +236,6 @@ class PlanFragmentExecutor {
   /// of the execution.
   RuntimeProfile::Counter* average_thread_tokens_;
 
-  /// (Atomic) Flag that indicates whether a completed fragment report has been or will
-  /// be fired. It is initialized to 0 and atomically swapped to 1 when a completed
-  /// fragment report is about to be fired. Used for reducing the probability that a
-  /// report is sent twice at the end of the fragment.
-  AtomicInt32 completed_report_sent_;
-
   /// Sampled memory usage at even time intervals.
   RuntimeProfile::TimeSeriesCounter* mem_usage_sampled_counter_;
 
@@ -260,22 +248,19 @@ class PlanFragmentExecutor {
   typedef std::map<TPlanNodeId, std::vector<TScanRangeParams>> PerNodeScanRanges;
 
   /// Main loop of profile reporting thread.
-  /// Exits when notified on done_cv_.
-  /// On exit, *no report is sent*, ie, this will not send the final report.
-  void ReportProfile();
-
-  /// Invoked the report callback if there is a report callback and the current
-  /// status isn't CANCELLED. Sets 'done' to true in the callback invocation if
-  /// done == true or we have an error status.
-  void SendReport(bool done);
-
-  /// If status_.ok(), sets status_ to status.
-  /// If we're transitioning to an error status, stops report thread and
-  /// sends a final report.
-  void UpdateStatus(const Status& status);
-
-  /// Called when the fragment execution is complete to finalize counters.
-  void FragmentComplete();
+  /// Exits when notified on stop_report_thread_cv_ and report_thread_active_ is set to
+  /// false. This will not send the final report.
+  void ReportProfileThread();
+
+  /// Invoked the report callback. If 'done' is true, sends the final report with
+  /// 'status' and the profile. This type of report is sent once and only by the
+  /// instance execution thread.  Otherwise, a profile-only report is sent, which the
+  /// ReportProfileThread() thread will do periodically.
+  void SendReport(bool done, const Status& status);
+
+  /// Called when the fragment execution is complete to finalize counters and send
+  /// the final status report.  Must be called only once.
+  void FragmentComplete(const Status& status);
 
   /// Optimizes the code-generated functions in runtime_state_->llvm_codegen().
   /// Must be called after exec_tree_->Prepare() and before exec_tree_->Open().

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/56f4d0f5/be/src/service/fragment-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fragment-exec-state.cc b/be/src/service/fragment-exec-state.cc
index cc56c19..337be82 100644
--- a/be/src/service/fragment-exec-state.cc
+++ b/be/src/service/fragment-exec-state.cc
@@ -46,25 +46,17 @@ Status FragmentMgr::FragmentExecState::Cancel() {
   return Status::OK();
 }
 
-Status FragmentMgr::FragmentExecState::Prepare() {
+void FragmentMgr::FragmentExecState::Exec() {
   Status status = executor_.Prepare(exec_params_);
-  if (!status.ok()) ReportStatusCb(status, NULL, true);
   prepare_promise_.Set(status);
-  return status;
-}
-
-void FragmentMgr::FragmentExecState::Exec() {
-  if (Prepare().ok()) {
-    executor_.Open();
-    executor_.Exec();
+  if (status.ok()) {
+    if (executor_.Open().ok()) {
+      executor_.Exec();
+    }
   }
   executor_.Close();
 }
 
-// There can only be one of these callbacks in-flight at any moment, because
-// it is only invoked from the executor's reporting thread.
-// Also, the reported status will always reflect the most recent execution status,
-// including the final status when execution finishes.
 void FragmentMgr::FragmentExecState::ReportStatusCb(
     const Status& status, RuntimeProfile* profile, bool done) {
   DCHECK(status.ok() || done);  // if !status.ok() => done

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/56f4d0f5/be/src/service/fragment-exec-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/fragment-exec-state.h b/be/src/service/fragment-exec-state.h
index c795cd8..1c64f2d 100644
--- a/be/src/service/fragment-exec-state.h
+++ b/be/src/service/fragment-exec-state.h
@@ -83,7 +83,7 @@ class FragmentMgr::FragmentExecState {
   /// if set to anything other than OK, execution has terminated w/ an error
   Status exec_status_;
 
-  /// Set once Prepare() has returned with exec_status_.
+  /// Barrier for the completion of executor_.Prepare().
   Promise<Status> prepare_promise_;
 
   /// Update 'exec_status_' w/ 'status', if the former is not already an error.
@@ -92,14 +92,12 @@ class FragmentMgr::FragmentExecState {
 
   /// Callback for executor; updates exec_status_ if 'status' indicates an error
   /// or if there was a thrift error.
-  ///
   /// If not NULL, `profile` is encoded as a Thrift structure and transmitted as part of
   /// the reporting RPC. `profile` may be NULL if a runtime profile has not been created
   /// for this fragment (e.g. when the fragment has failed during preparation).
+  /// The executor must ensure that there is only one invocation at a time.
   void ReportStatusCb(const Status& status, RuntimeProfile* profile, bool done);
 
-  /// Call Prepare() and create and initialize data sink.
-  Status Prepare();
 };
 
 }


[6/6] incubator-impala git commit: IMPALA-4527: Columns in Kudu tables created from Impala default to "NULL"

Posted by kw...@apache.org.
IMPALA-4527: Columns in Kudu tables created from Impala default to "NULL"

This commit reverts the behavior introduced by IMPALA-3719 which used
the Kudu default behavior for column nullability if none was specified
in the CREATE TABLE statement. With this commit, non-key columns of Kudu
tables that are created from Impala are by default nullable unless
specified otherwise.

Change-Id: I950d9a9c64e3851e11a641573617790b340ece94
Reviewed-on: http://gerrit.cloudera.org:8080/5259
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/da34ce97
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/da34ce97
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/da34ce97

Branch: refs/heads/master
Commit: da34ce9780d03b9abcd7adf2cd75d2f57ab97b1d
Parents: b374061
Author: Dimitris Tsirogiannis <dt...@cloudera.com>
Authored: Tue Nov 29 10:02:35 2016 -0800
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Fri Dec 2 02:06:22 2016 +0000

----------------------------------------------------------------------
 .../impala/service/KuduCatalogOpExecutor.java   | 15 ++++++++---
 .../queries/QueryTest/kudu_describe.test        | 26 ++++++++++----------
 .../queries/QueryTest/kudu_stats.test           |  6 ++---
 3 files changed, 27 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/da34ce97/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
index e3f9a7f..aa553cf 100644
--- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
@@ -97,6 +97,7 @@ public class KuduCatalogOpExecutor {
   private static Schema createTableSchema(TCreateTableParams params)
       throws ImpalaRuntimeException {
     Set<String> keyColNames = new HashSet<>(params.getPrimary_key_column_names());
+    Preconditions.checkState(!keyColNames.isEmpty());
     List<ColumnSchema> colSchemas = new ArrayList<>(params.getColumnsSize());
     for (TColumn column: params.getColumns()) {
       Type type = Type.fromThrift(column.getColumnType());
@@ -105,9 +106,15 @@ public class KuduCatalogOpExecutor {
       // Create the actual column and check if the column is a key column
       ColumnSchemaBuilder csb =
           new ColumnSchemaBuilder(column.getColumnName(), kuduType);
-      Preconditions.checkState(column.isSetIs_key());
-      csb.key(keyColNames.contains(column.getColumnName()));
-      if (column.isSetIs_nullable()) csb.nullable(column.isIs_nullable());
+      boolean isKey = keyColNames.contains(column.getColumnName());
+      csb.key(isKey);
+      if (column.isSetIs_nullable()) {
+        csb.nullable(column.isIs_nullable());
+      } else if (!isKey) {
+        // Non-key columns are by default nullable unless the user explicitly sets their
+        // nullability.
+        csb.nullable(true);
+      }
       if (column.isSetDefault_value()) {
         csb.defaultValue(KuduUtil.getKuduDefaultValue(column.getDefault_value(), kuduType,
             column.getColumnName()));
@@ -363,7 +370,7 @@ public class KuduCatalogOpExecutor {
       Type type = Type.fromThrift(column.getColumnType());
       Preconditions.checkState(type != null);
       org.apache.kudu.Type kuduType = KuduUtil.fromImpalaType(type);
-      boolean isNullable = column.isSetIs_nullable() && column.isIs_nullable();
+      boolean isNullable = !column.isSetIs_nullable() ? true : column.isIs_nullable();
       if (isNullable) {
         if (column.isSetDefault_value()) {
           // See KUDU-1747

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/da34ce97/testdata/workloads/functional-query/queries/QueryTest/kudu_describe.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_describe.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_describe.test
index 280c87d..3d8ac2c 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_describe.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_describe.test
@@ -4,19 +4,19 @@ describe functional_kudu.alltypes
 ---- LABELS
 NAME,TYPE,COMMENT,PRIMARY_KEY,NULLABLE,DEFAULT_VALUE,ENCODING,COMPRESSION,BLOCK_SIZE
 ---- RESULTS
-'bigint_col','bigint','','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'bool_col','boolean','','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'date_string_col','string','','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'double_col','double','','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'float_col','float','','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'bigint_col','bigint','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'bool_col','boolean','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'date_string_col','string','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'double_col','double','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'float_col','float','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
 'id','int','','true','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'int_col','int','','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'month','int','','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'smallint_col','smallint','','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'string_col','string','','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'timestamp_col','string','','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'tinyint_col','tinyint','','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'year','int','','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'int_col','int','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'month','int','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'smallint_col','smallint','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'string_col','string','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'timestamp_col','string','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'tinyint_col','tinyint','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'year','int','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
 ---- TYPES
 STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING
 ====
@@ -27,7 +27,7 @@ create table describe_test
  pk2 int,
  pk3 string,
  c1 string null default 'abc' comment 'testing',
- c2 int default 100 encoding plain_encoding compression snappy,
+ c2 int not null default 100 encoding plain_encoding compression snappy,
  c3 int null block_size 8388608,
  primary key (pk1, pk2, pk3))
 distribute by hash (pk1) into 3 buckets

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/da34ce97/testdata/workloads/functional-query/queries/QueryTest/kudu_stats.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_stats.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_stats.test
index 3ae4f69..dc8f052 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_stats.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_stats.test
@@ -23,9 +23,9 @@ compute stats simple;
 describe simple;
 ---- RESULTS
 'id','int','','true','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'name','string','','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'valf','float','','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'vali','bigint','','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'name','string','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'valf','float','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'vali','bigint','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
 ---- TYPES
 STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING
 ====


[2/6] incubator-impala git commit: Bump Kudu python version to 1.1

Posted by kw...@apache.org.
Bump Kudu python version to 1.1

Change-Id: I5834b3aa4eeae363eae938f61e473c52a0fe5596
Reviewed-on: http://gerrit.cloudera.org:8080/5307
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/f3fe2cfe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/f3fe2cfe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/f3fe2cfe

Branch: refs/heads/master
Commit: f3fe2cfe10c70443e7b62ae1e79949ce731588e8
Parents: 1b8fede
Author: Matthew Jacobs <mj...@cloudera.com>
Authored: Thu Dec 1 09:07:51 2016 -0800
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Thu Dec 1 23:11:49 2016 +0000

----------------------------------------------------------------------
 infra/python/deps/download_requirements | 2 +-
 infra/python/deps/requirements.txt      | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3fe2cfe/infra/python/deps/download_requirements
----------------------------------------------------------------------
diff --git a/infra/python/deps/download_requirements b/infra/python/deps/download_requirements
index e068c34..2ef8922 100755
--- a/infra/python/deps/download_requirements
+++ b/infra/python/deps/download_requirements
@@ -29,5 +29,5 @@ PY26="$(./find_py26.py)"
 "$PY26" pip_download.py virtualenv 13.1.0
 # kudu-python is downloaded separately because pip install attempts to execute a
 # setup.py subcommand for kudu-python that can fail even if the download succeeds.
-"$PY26" pip_download.py kudu-python 0.3.0
+"$PY26" pip_download.py kudu-python 1.1.0
 popd

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3fe2cfe/infra/python/deps/requirements.txt
----------------------------------------------------------------------
diff --git a/infra/python/deps/requirements.txt b/infra/python/deps/requirements.txt
index dcbb91c..c3b8648 100644
--- a/infra/python/deps/requirements.txt
+++ b/infra/python/deps/requirements.txt
@@ -82,7 +82,7 @@ texttable == 0.8.3
 # functional and determines the expected kudu-python version. The version must be listed
 # in the format below including # and spacing. Keep this formatting! The kudu-python
 # version in download_requirements must be kept in sync with this version.
-# kudu-python==0.3.0
+# kudu-python==1.1.0
   Cython == 0.23.4
   numpy == 1.10.4