You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/12 21:30:55 UTC

[1/5] impala git commit: IMPALA-6495: fix targeted-perf for new column alias syntax

Repository: impala
Updated Branches:
  refs/heads/2.x bf892858a -> c3e993327


IMPALA-6495: fix targeted-perf for new column alias syntax

I was able to run bin/single_node_perf_run.py after these changes.

Change-Id: Ib25139873b1c67f8039ac6c85e936135e008101b
Reviewed-on: http://gerrit.cloudera.org:8080/9268
Reviewed-by: Philip Zeyliger <ph...@cloudera.com>
Tested-by: Tim Armstrong <ta...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/1e17aba7
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/1e17aba7
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/1e17aba7

Branch: refs/heads/2.x
Commit: 1e17aba7e78af7b0561513c18f5cac46c564e5c4
Parents: c53b8ad
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Feb 7 08:46:45 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Feb 9 17:10:16 2018 +0000

----------------------------------------------------------------------
 .../primitive_shuffle_join_one_to_many_string_with_groupby.test    | 2 +-
 .../queries/primitive_shuffle_join_union_all_with_groupby.test     | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/1e17aba7/testdata/workloads/targeted-perf/queries/primitive_shuffle_join_one_to_many_string_with_groupby.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/targeted-perf/queries/primitive_shuffle_join_one_to_many_string_with_groupby.test b/testdata/workloads/targeted-perf/queries/primitive_shuffle_join_one_to_many_string_with_groupby.test
index fd90d8a..3ca78cf 100644
--- a/testdata/workloads/targeted-perf/queries/primitive_shuffle_join_one_to_many_string_with_groupby.test
+++ b/testdata/workloads/targeted-perf/queries/primitive_shuffle_join_one_to_many_string_with_groupby.test
@@ -11,7 +11,7 @@ JOIN /* +shuffle */
   ( SELECT upper(concat(cast(o_orderkey AS string),'bla')) o_orderkey_string
    FROM orders) o ON l.l_orderkey_string = o.o_orderkey_string
 GROUP BY o.o_orderkey_string
-HAVING cnt = 999999;
+HAVING count(*) = 999999;
 ---- RESULTS
 ---- TYPES
 ====

http://git-wip-us.apache.org/repos/asf/impala/blob/1e17aba7/testdata/workloads/targeted-perf/queries/primitive_shuffle_join_union_all_with_groupby.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/targeted-perf/queries/primitive_shuffle_join_union_all_with_groupby.test b/testdata/workloads/targeted-perf/queries/primitive_shuffle_join_union_all_with_groupby.test
index 6295274..4aebb80 100644
--- a/testdata/workloads/targeted-perf/queries/primitive_shuffle_join_union_all_with_groupby.test
+++ b/testdata/workloads/targeted-perf/queries/primitive_shuffle_join_union_all_with_groupby.test
@@ -20,7 +20,7 @@ FROM (
     GROUP BY l_orderkey
     ) a
 GROUP BY l_orderkey
-HAVING ROWCOUNT = 99999999;
+HAVING count(*) = 99999999;
 ---- RESULTS
 ---- TYPES
 ====


[5/5] impala git commit: IMPALA-6396: Exchange node's memory usage should include its receiver's

Posted by ta...@apache.org.
IMPALA-6396: Exchange node's memory usage should include its receiver's

A DataStreamRecvr is co-owned by the DataStreamMgr and
an Exchange node. However, the life time of the memory
allocations (e.g. row batches) of a DataStreamRecvr never
exceeds that of its owning Exchange node. Previously, we
used the fragment instance's MemTracker as the parent of
the DataStreamRecvr's MemTracker. This change switches to
using the MemTracker of the owning Exchange node as the
parent tracker of the DataStreamRecvr. This makes it
easier to identify the peak memory usage of the receivers
of different exchange nodes in the runtime profile and
query summary. Most of the exchange node's memory usage
is from its receiver so we don't track the peak memory
usage of the receiver separately.

Sample output from TPCH-Q21:

EXCHANGE_NODE (id=18):(Total: 1s448ms, non-child: 265.818ms, % non-child: 18.35%)
   - ConvertRowBatchTime: 223.895ms
   - PeakMemoryUsage: 10.04 MB (10524943)
   - RowsReturned: 1.27M (1267464)
   - RowsReturnedRate: 875.19 K/sec
  RecvrSide:
    BytesReceived(500.000ms): 0, 1.64 MB, 9.98 MB, 9.98 MB, 10.01 MB, 10.01 MB, 10.01 MB, 31.79 MB, 60.19 MB, 87.84 MB
     - FirstBatchArrivalWaitTime: 0.000ns
     - TotalBytesReceived: 93.07 MB (97594728)
     - TotalGetBatchTime: 1s194ms
       - DataArrivalTimer: 1s183ms
   SenderSide:
      - DeserializeRowBatchTime: 344.343ms
      - NumBatchesAccepted: 3.80K (3796)
      - NumBatchesDeferred: 5 (5)
      - NumEarlySenders: 0 (0)

Testing done: Updated test_observability.py to verify the
peak memory usage of exchange node is not 0.

Change-Id: I8ca3c47d87bfcd221d34565eda1878f3c15d5c45
Reviewed-on: http://gerrit.cloudera.org:8080/9202
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/c3e99332
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/c3e99332
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/c3e99332

Branch: refs/heads/2.x
Commit: c3e993327365ca90d186fe534294c5c5f07de156
Parents: 1e17aba
Author: Michael Ho <kw...@cloudera.com>
Authored: Thu Feb 1 13:56:31 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Feb 10 04:10:16 2018 +0000

----------------------------------------------------------------------
 be/src/exec/exchange-node.cc           |  6 +++---
 be/src/runtime/data-stream-mgr-base.h  |  9 +++++----
 be/src/runtime/data-stream-mgr.cc      | 17 ++++++++---------
 be/src/runtime/data-stream-mgr.h       | 11 ++++++-----
 be/src/runtime/data-stream-test.cc     | 13 ++++++-------
 be/src/runtime/krpc-data-stream-mgr.cc | 13 ++++++-------
 be/src/runtime/krpc-data-stream-mgr.h  | 11 ++++++-----
 tests/query_test/test_observability.py |  2 ++
 8 files changed, 42 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/c3e99332/be/src/exec/exchange-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index 353a59b..cc39382 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -81,9 +81,9 @@ Status ExchangeNode::Prepare(RuntimeState* state) {
 
   // TODO: figure out appropriate buffer size
   DCHECK_GT(num_senders_, 0);
-  stream_recvr_ = ExecEnv::GetInstance()->stream_mgr()->CreateRecvr(state,
-      &input_row_desc_, state->fragment_instance_id(), id_, num_senders_,
-      FLAGS_exchg_node_buffer_size_bytes, runtime_profile(), is_merging_);
+  stream_recvr_ = ExecEnv::GetInstance()->stream_mgr()->CreateRecvr(&input_row_desc_,
+      state->fragment_instance_id(), id_, num_senders_,
+      FLAGS_exchg_node_buffer_size_bytes, is_merging_, runtime_profile(), mem_tracker());
   if (is_merging_) {
     less_than_.reset(
         new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_));

http://git-wip-us.apache.org/repos/asf/impala/blob/c3e99332/be/src/runtime/data-stream-mgr-base.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr-base.h b/be/src/runtime/data-stream-mgr-base.h
index 0e392e3..f9761cb 100644
--- a/be/src/runtime/data-stream-mgr-base.h
+++ b/be/src/runtime/data-stream-mgr-base.h
@@ -26,6 +26,7 @@
 namespace impala {
 
 class DataStreamRecvrBase;
+class MemTracker;
 class RuntimeProfile;
 class RuntimeState;
 class TRowBatch;
@@ -43,10 +44,10 @@ class DataStreamMgrBase : public CacheLineAligned {
   virtual ~DataStreamMgrBase() { }
 
   /// Create a receiver for a specific fragment_instance_id/node_id destination;
-  virtual std::shared_ptr<DataStreamRecvrBase> CreateRecvr(RuntimeState* state,
-      const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
-      PlanNodeId dest_node_id, int num_senders, int64_t buffer_size,
-      RuntimeProfile* profile, bool is_merging) = 0;
+  virtual std::shared_ptr<DataStreamRecvrBase> CreateRecvr(const RowDescriptor* row_desc,
+      const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
+      int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
+      MemTracker* parent_tracker) = 0;
 
   /// Closes all receivers registered for fragment_instance_id immediately.
   virtual void Cancel(const TUniqueId& fragment_instance_id) = 0;

http://git-wip-us.apache.org/repos/asf/impala/blob/c3e99332/be/src/runtime/data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.cc b/be/src/runtime/data-stream-mgr.cc
index 45eee7f..48a819c 100644
--- a/be/src/runtime/data-stream-mgr.cc
+++ b/be/src/runtime/data-stream-mgr.cc
@@ -75,17 +75,16 @@ inline uint32_t DataStreamMgr::GetHashValue(
   return value;
 }
 
-shared_ptr<DataStreamRecvrBase> DataStreamMgr::CreateRecvr(RuntimeState* state,
-    const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
-    PlanNodeId dest_node_id, int num_senders, int64_t buffer_size,
-    RuntimeProfile* profile, bool is_merging) {
-  DCHECK(profile != NULL);
+shared_ptr<DataStreamRecvrBase> DataStreamMgr::CreateRecvr(const RowDescriptor* row_desc,
+    const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
+    int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
+    MemTracker* parent_tracker) {
+  DCHECK(profile != nullptr);
+  DCHECK(parent_tracker != nullptr);
   VLOG_FILE << "creating receiver for fragment="
             << fragment_instance_id << ", node=" << dest_node_id;
-  shared_ptr<DataStreamRecvr> recvr(
-      new DataStreamRecvr(this, state->instance_mem_tracker(), row_desc,
-          fragment_instance_id, dest_node_id, num_senders, is_merging, buffer_size,
-          profile));
+  shared_ptr<DataStreamRecvr> recvr(new DataStreamRecvr(this, parent_tracker, row_desc,
+      fragment_instance_id, dest_node_id, num_senders, is_merging, buffer_size, profile));
   size_t hash_value = GetHashValue(fragment_instance_id, dest_node_id);
   lock_guard<mutex> l(lock_);
   fragment_recvr_set_.insert(make_pair(fragment_instance_id, dest_node_id));

http://git-wip-us.apache.org/repos/asf/impala/blob/c3e99332/be/src/runtime/data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.h b/be/src/runtime/data-stream-mgr.h
index 07f7c56..2be6478 100644
--- a/be/src/runtime/data-stream-mgr.h
+++ b/be/src/runtime/data-stream-mgr.h
@@ -71,13 +71,14 @@ class DataStreamMgr : public DataStreamMgrBase {
   /// Create a receiver for a specific fragment_instance_id/node_id destination;
   /// If is_merging is true, the receiver maintains a separate queue of incoming row
   /// batches for each sender and merges the sorted streams from each sender into a
-  /// single stream.
+  /// single stream. 'parent_tracker' is the MemTracker of the exchange node which owns
+  /// this receiver. It's the parent of the MemTracker of the newly created receiver.
   /// Ownership of the receiver is shared between this DataStream mgr instance and the
   /// caller.
-  std::shared_ptr<DataStreamRecvrBase> CreateRecvr(RuntimeState* state,
-      const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
-      PlanNodeId dest_node_id, int num_senders, int64_t buffer_size,
-      RuntimeProfile* profile, bool is_merging) override;
+  std::shared_ptr<DataStreamRecvrBase> CreateRecvr(const RowDescriptor* row_desc,
+      const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
+      int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
+      MemTracker* parent_tracker) override;
 
   /// Adds a row batch to the recvr identified by fragment_instance_id/dest_node_id
   /// if the recvr has not been cancelled. sender_id identifies the sender instance

http://git-wip-us.apache.org/repos/asf/impala/blob/c3e99332/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 07eefd4..75d5ac9 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -415,8 +415,8 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     GetNextInstanceId(&instance_id);
     receiver_info_.push_back(ReceiverInfo(stream_type, num_senders, receiver_num));
     ReceiverInfo& info = receiver_info_.back();
-    info.stream_recvr = stream_mgr_->CreateRecvr(runtime_state_.get(), row_desc_,
-        instance_id, DEST_NODE_ID, num_senders, buffer_size, profile, is_merging);
+    info.stream_recvr = stream_mgr_->CreateRecvr(row_desc_, instance_id, DEST_NODE_ID,
+        num_senders, buffer_size, is_merging, profile, &tracker_);
     if (!is_merging) {
       info.thread_handle = new thread(&DataStreamTest::ReadStream, this, &info);
     } else {
@@ -767,9 +767,8 @@ TEST_P(DataStreamTestThriftOnly, CloseRecvrWhileReferencesRemain) {
   // Start just one receiver.
   TUniqueId instance_id;
   GetNextInstanceId(&instance_id);
-  shared_ptr<DataStreamRecvrBase> stream_recvr = stream_mgr_->CreateRecvr(
-      runtime_state.get(), row_desc_, instance_id, DEST_NODE_ID, 1, 1, profile,
-      false);
+  shared_ptr<DataStreamRecvrBase> stream_recvr = stream_mgr_->CreateRecvr(row_desc_,
+      instance_id, DEST_NODE_ID, 1, 1, false, profile, &tracker_);
 
   // Perform tear down, but keep a reference to the receiver so that it is deleted last
   // (to confirm that the destructor does not access invalid state after tear-down).
@@ -832,8 +831,8 @@ TEST_P(DataStreamTestForImpala6346, TestNoDeadlock) {
   RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool_, "TestReceiver");
   receiver_info_.push_back(ReceiverInfo(TPartitionType::UNPARTITIONED, 4, 1));
   ReceiverInfo& info = receiver_info_.back();
-  info.stream_recvr = stream_mgr_->CreateRecvr(runtime_state_.get(), row_desc_,
-      instance_id, DEST_NODE_ID, 4, 1024 * 1024, profile, false);
+  info.stream_recvr = stream_mgr_->CreateRecvr(row_desc_, instance_id, DEST_NODE_ID,
+      4, 1024 * 1024, false, profile, &tracker_);
   info.thread_handle = new thread(
       &DataStreamTestForImpala6346_TestNoDeadlock_Test::ReadStream, this, &info);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/c3e99332/be/src/runtime/krpc-data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc b/be/src/runtime/krpc-data-stream-mgr.cc
index fabea13..91111dc 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -94,16 +94,15 @@ inline uint32_t KrpcDataStreamMgr::GetHashValue(
 }
 
 shared_ptr<DataStreamRecvrBase> KrpcDataStreamMgr::CreateRecvr(
-    RuntimeState* state, const RowDescriptor* row_desc,
-    const TUniqueId& finst_id, PlanNodeId dest_node_id, int num_senders,
-    int64_t buffer_size, RuntimeProfile* profile, bool is_merging) {
-
+    const RowDescriptor* row_desc, const TUniqueId& finst_id, PlanNodeId dest_node_id,
+    int num_senders, int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
+    MemTracker* parent_tracker) {
   DCHECK(profile != nullptr);
+  DCHECK(parent_tracker != nullptr);
   VLOG_FILE << "creating receiver for fragment="<< finst_id
             << ", node=" << dest_node_id;
-  shared_ptr<KrpcDataStreamRecvr> recvr(
-      new KrpcDataStreamRecvr(this, state->instance_mem_tracker(), row_desc,
-          finst_id, dest_node_id, num_senders, is_merging, buffer_size, profile));
+  shared_ptr<KrpcDataStreamRecvr> recvr(new KrpcDataStreamRecvr(this, parent_tracker,
+      row_desc, finst_id, dest_node_id, num_senders, is_merging, buffer_size, profile));
   uint32_t hash_value = GetHashValue(finst_id, dest_node_id);
   EarlySendersList early_senders_for_recvr;
   {

http://git-wip-us.apache.org/repos/asf/impala/blob/c3e99332/be/src/runtime/krpc-data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.h b/be/src/runtime/krpc-data-stream-mgr.h
index 458ebe7..16c0b30 100644
--- a/be/src/runtime/krpc-data-stream-mgr.h
+++ b/be/src/runtime/krpc-data-stream-mgr.h
@@ -236,13 +236,14 @@ class KrpcDataStreamMgr : public DataStreamMgrBase {
   /// Create a receiver for a specific fragment_instance_id/dest_node_id.
   /// If is_merging is true, the receiver maintains a separate queue of incoming row
   /// batches for each sender and merges the sorted streams from each sender into a
-  /// single stream.
+  /// single stream. 'parent_tracker' is the MemTracker of the exchange node which owns
+  /// this receiver. It's the parent of the MemTracker of the newly created receiver.
   /// Ownership of the receiver is shared between this DataStream mgr instance and the
   /// caller.
-  std::shared_ptr<DataStreamRecvrBase> CreateRecvr(RuntimeState* state,
-      const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
-      PlanNodeId dest_node_id, int num_senders, int64_t buffer_size,
-      RuntimeProfile* profile, bool is_merging) override;
+  std::shared_ptr<DataStreamRecvrBase> CreateRecvr(const RowDescriptor* row_desc,
+      const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
+      int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
+      MemTracker* parent_tracker) override;
 
   /// Handler for TransmitData() RPC.
   ///

http://git-wip-us.apache.org/repos/asf/impala/blob/c3e99332/tests/query_test/test_observability.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index 85fc4f1..9c0bb65 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -38,6 +38,7 @@ class TestObservability(ImpalaTestSuite):
     assert result.exec_summary[0]['operator'] == '05:MERGING-EXCHANGE'
     assert result.exec_summary[0]['num_rows'] == 5
     assert result.exec_summary[0]['est_num_rows'] == 5
+    assert result.exec_summary[0]['peak_mem'] > 0
 
     for line in result.runtime_profile.split('\n'):
       # The first 'RowsProduced' we find is for the coordinator fragment.
@@ -55,6 +56,7 @@ class TestObservability(ImpalaTestSuite):
     assert result.exec_summary[5]['operator'] == '04:EXCHANGE'
     assert result.exec_summary[5]['num_rows'] == 25
     assert result.exec_summary[5]['est_num_rows'] == 25
+    assert result.exec_summary[5]['peak_mem'] > 0
 
   @SkipIfS3.hbase
   @SkipIfLocal.hbase


[3/5] impala git commit: IMPALA-6486: Fix INVALIDATE METADATA hang after statestore restart

Posted by ta...@apache.org.
IMPALA-6486: Fix INVALIDATE METADATA hang after statestore restart

This commit fixes an issue where an INVALIDATE METADATA statement will
hang if executed after a statestore restart. The issue was that the
CatalogObjectVersionQueue was not properly reset when a non-delta
catalog topic update was received in an impalad, causing it to record
duplicate catalog versions and, hence, not properly advancing the minimum
catalog object watermark. The latter is used by INVALIDATE METADATA to
determine when the effects of that operation have been applied to the
local impalad catalog cache.

Change-Id: Icf3ee31c28884ed79c5dbc700ccd4ef761015c68
Reviewed-on: http://gerrit.cloudera.org:8080/9232
Reviewed-by: Dimitris Tsirogiannis <dt...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/cdbacb1c
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/cdbacb1c
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/cdbacb1c

Branch: refs/heads/2.x
Commit: cdbacb1c510eb54c9a03b9a37ae96012f15e5629
Parents: bf89285
Author: Dimitris Tsirogiannis <dt...@cloudera.com>
Authored: Tue Feb 6 15:11:53 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Feb 9 17:10:16 2018 +0000

----------------------------------------------------------------------
 .../org/apache/impala/catalog/CatalogObjectVersionQueue.java  | 2 ++
 .../main/java/org/apache/impala/catalog/ImpaladCatalog.java   | 7 +++----
 2 files changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/cdbacb1c/fe/src/main/java/org/apache/impala/catalog/CatalogObjectVersionQueue.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogObjectVersionQueue.java b/fe/src/main/java/org/apache/impala/catalog/CatalogObjectVersionQueue.java
index 5fcd398..e24e66b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogObjectVersionQueue.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogObjectVersionQueue.java
@@ -70,4 +70,6 @@ public class CatalogObjectVersionQueue {
       removeVersion(catalogObject.getCatalogVersion());
     }
   }
+
+  public void clear() { objectVersions_.clear(); }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/cdbacb1c/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
index 79960e4..99bd23e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
@@ -97,13 +97,12 @@ public class ImpaladCatalog extends Catalog {
   // Used during table creation.
   private final String defaultKuduMasterHosts_;
 
-  /**
-   * C'tor used by tests that need to validate the ImpaladCatalog outside of the
-   * CatalogServer.
-   */
   public ImpaladCatalog(String defaultKuduMasterHosts) {
     super();
     defaultKuduMasterHosts_ = defaultKuduMasterHosts;
+    // Ensure the contents of the CatalogObjectVersionQueue instance are cleared when a
+    // new instance of ImpaladCatalog is created (see IMPALA-6486).
+    CatalogObjectVersionQueue.INSTANCE.clear();
   }
 
   /**


[2/5] impala git commit: IMPALA-6219: Use AES-GCM for spill-to-disk encryption

Posted by ta...@apache.org.
IMPALA-6219: Use AES-GCM for spill-to-disk encryption

AES-GCM can be very fast(~10 times faster than CFB+SHA256), but it
requires an instruction that Impala can currently run without (CLMUL).
In order to be fast, we dispatch to GCM mode at run-time based on the
CPU and OpenSSL version.

Testing:
run runtime tmp-file-mgr-test, openssl-util-test, buffer-pool-test
and buffered-tuple-stream-test.
add two cases GcmIntegrity & EncryptoArbitraryLength for
openssl-util-test

Change-Id: Ia188a0c5b74e4a22fb30f8c12f65e0469eb75f6b
Reviewed-on: http://gerrit.cloudera.org:8080/9238
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Tim Armstrong <ta...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/c53b8ad0
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/c53b8ad0
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/c53b8ad0

Branch: refs/heads/2.x
Commit: c53b8ad063300fca98a2cb9e34a9e82bcb1b10e6
Parents: 915a658
Author: Xianda Ke <ke...@gmail.com>
Authored: Wed Feb 7 23:23:00 2018 +0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Feb 9 17:10:16 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/tmp-file-mgr.cc   |  15 +++--
 be/src/util/cpu-info.cc          |  13 +++--
 be/src/util/cpu-info.h           |  13 +++--
 be/src/util/openssl-util-test.cc |  95 ++++++++++++++++++++----------
 be/src/util/openssl-util.cc      | 106 ++++++++++++++++++++++++++++++----
 be/src/util/openssl-util.h       |  70 +++++++++++++++-------
 6 files changed, 233 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/c53b8ad0/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index d35d302..3807670 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -612,19 +612,26 @@ void TmpFileMgr::WriteHandle::WaitForWrite() {
 Status TmpFileMgr::WriteHandle::EncryptAndHash(MemRange buffer) {
   DCHECK(FLAGS_disk_spill_encryption);
   SCOPED_TIMER(encryption_timer_);
-  // Since we're using AES-CTR/AES-CFB mode, we must take care not to reuse a
+  // Since we're using GCM/CTR/CFB mode, we must take care not to reuse a
   // key/IV pair. Regenerate a new key and IV for every data buffer we write.
   key_.InitializeRandom();
   RETURN_IF_ERROR(key_.Encrypt(buffer.data(), buffer.len(), buffer.data()));
-  hash_.Compute(buffer.data(), buffer.len());
+
+  if (!key_.IsGcmMode()) {
+    hash_.Compute(buffer.data(), buffer.len());
+  }
   return Status::OK();
 }
 
 Status TmpFileMgr::WriteHandle::CheckHashAndDecrypt(MemRange buffer) {
   DCHECK(FLAGS_disk_spill_encryption);
   SCOPED_TIMER(encryption_timer_);
-  if (!hash_.Verify(buffer.data(), buffer.len())) {
-    return Status("Block verification failure");
+
+  // GCM mode will verify the integrity by itself
+  if (!key_.IsGcmMode()) {
+    if (!hash_.Verify(buffer.data(), buffer.len())) {
+      return Status("Block verification failure");
+    }
   }
   return key_.Decrypt(buffer.data(), buffer.len(), buffer.data());
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/c53b8ad0/be/src/util/cpu-info.cc
----------------------------------------------------------------------
diff --git a/be/src/util/cpu-info.cc b/be/src/util/cpu-info.cc
index a32571e..1e3fcde 100644
--- a/be/src/util/cpu-info.cc
+++ b/be/src/util/cpu-info.cc
@@ -85,12 +85,13 @@ static struct {
   int64_t flag;
 } flag_mappings[] =
 {
-  { "ssse3",  CpuInfo::SSSE3 },
-  { "sse4_1", CpuInfo::SSE4_1 },
-  { "sse4_2", CpuInfo::SSE4_2 },
-  { "popcnt", CpuInfo::POPCNT },
-  { "avx",    CpuInfo::AVX },
-  { "avx2",   CpuInfo::AVX2 },
+  { "ssse3",     CpuInfo::SSSE3 },
+  { "sse4_1",    CpuInfo::SSE4_1 },
+  { "sse4_2",    CpuInfo::SSE4_2 },
+  { "popcnt",    CpuInfo::POPCNT },
+  { "avx",       CpuInfo::AVX },
+  { "avx2",      CpuInfo::AVX2 },
+  { "pclmuldqd", CpuInfo::PCLMULQDQ }
 };
 static const long num_flags = sizeof(flag_mappings) / sizeof(flag_mappings[0]);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/c53b8ad0/be/src/util/cpu-info.h
----------------------------------------------------------------------
diff --git a/be/src/util/cpu-info.h b/be/src/util/cpu-info.h
index 38d6782..e60babc 100644
--- a/be/src/util/cpu-info.h
+++ b/be/src/util/cpu-info.h
@@ -34,12 +34,13 @@ namespace impala {
 /// /sys/devices)
 class CpuInfo {
  public:
-  static const int64_t SSSE3   = (1 << 1);
-  static const int64_t SSE4_1  = (1 << 2);
-  static const int64_t SSE4_2  = (1 << 3);
-  static const int64_t POPCNT  = (1 << 4);
-  static const int64_t AVX     = (1 << 5);
-  static const int64_t AVX2    = (1 << 6);
+  static const int64_t SSSE3     = (1 << 1);
+  static const int64_t SSE4_1    = (1 << 2);
+  static const int64_t SSE4_2    = (1 << 3);
+  static const int64_t POPCNT    = (1 << 4);
+  static const int64_t AVX       = (1 << 5);
+  static const int64_t AVX2      = (1 << 6);
+  static const int64_t PCLMULQDQ = (1 << 7);
 
   /// Cache enums for L1 (data), L2 and L3
   enum CacheLevel {

http://git-wip-us.apache.org/repos/asf/impala/blob/c53b8ad0/be/src/util/openssl-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/openssl-util-test.cc b/be/src/util/openssl-util-test.cc
index 8d98b0d..76f65a5 100644
--- a/be/src/util/openssl-util-test.cc
+++ b/be/src/util/openssl-util-test.cc
@@ -44,6 +44,41 @@ class OpenSSLUtilTest : public ::testing::Test {
     }
   }
 
+  /// Fill arbitrary-length buffer with random bytes
+  void GenerateRandomBytes(uint8_t* data, int64_t len) {
+    DCHECK_GE(len, 0);
+    for (int64_t i = 0; i < len; i++) {
+      data[i] = uniform_int_distribution<uint8_t>(0, UINT8_MAX)(rng_);
+    }
+  }
+
+  void TestEncryptionDecryption(const int64_t buffer_size) {
+    vector<uint8_t> original(buffer_size);
+    vector<uint8_t> scratch(buffer_size); // Scratch buffer for in-place encryption.
+    if (buffer_size % 8 == 0) {
+      GenerateRandomData(original.data(), buffer_size);
+    } else {
+      GenerateRandomBytes(original.data(), buffer_size);
+    }
+
+    // Check all the modes
+    AES_CIPHER_MODE modes[] = {AES_256_GCM, AES_256_CTR, AES_256_CFB};
+    for (auto m : modes) {
+      memcpy(scratch.data(), original.data(), buffer_size);
+
+      EncryptionKey key;
+      key.InitializeRandom();
+      key.SetCipherMode(m);
+
+      ASSERT_OK(key.Encrypt(scratch.data(), buffer_size, scratch.data()));
+      // Check that encryption did something
+      ASSERT_NE(0, memcmp(original.data(), scratch.data(), buffer_size));
+      ASSERT_OK(key.Decrypt(scratch.data(), buffer_size, scratch.data()));
+      // Check that we get the original data back.
+      ASSERT_EQ(0, memcmp(original.data(), scratch.data(), buffer_size));
+    }
+  }
+
   mt19937_64 rng_;
 };
 
@@ -57,7 +92,7 @@ TEST_F(OpenSSLUtilTest, Encryption) {
   GenerateRandomData(original.data(), buffer_size);
 
   // Check both CTR & CFB
-  AES_CIPHER_MODE modes[] = {AES_256_CTR, AES_256_CFB};
+  AES_CIPHER_MODE modes[] = {AES_256_GCM, AES_256_CTR, AES_256_CFB};
   for (auto m : modes) {
     // Iterate multiple times to ensure that key regeneration works correctly.
     EncryptionKey key;
@@ -85,44 +120,42 @@ TEST_F(OpenSSLUtilTest, Encryption) {
 /// Test that encryption and decryption work in-place.
 TEST_F(OpenSSLUtilTest, EncryptInPlace) {
   const int buffer_size = 1024 * 1024;
-  vector<uint8_t> original(buffer_size);
-  vector<uint8_t> scratch(buffer_size); // Scratch buffer for in-place encryption.
-
-  EncryptionKey key;
-  // Check both CTR & CFB
-  AES_CIPHER_MODE modes[] = {AES_256_CTR, AES_256_CFB};
-  for (auto m : modes) {
-    GenerateRandomData(original.data(), buffer_size);
-    memcpy(scratch.data(), original.data(), buffer_size);
-
-    key.InitializeRandom();
-    key.SetCipherMode(m);
-
-    ASSERT_OK(key.Encrypt(scratch.data(), buffer_size, scratch.data()));
-    // Check that encryption did something
-    ASSERT_NE(0, memcmp(original.data(), scratch.data(), buffer_size));
-    ASSERT_OK(key.Decrypt(scratch.data(), buffer_size, scratch.data()));
-    // Check that we get the original data back.
-    ASSERT_EQ(0, memcmp(original.data(), scratch.data(), buffer_size));
-  }
+  TestEncryptionDecryption(buffer_size);
 }
 
 /// Test that encryption works with buffer lengths that don't fit in a 32-bit integer.
 TEST_F(OpenSSLUtilTest, EncryptInPlaceHugeBuffer) {
   const int64_t buffer_size = 3 * 1024L * 1024L * 1024L;
-  vector<uint8_t> original(buffer_size);
-  vector<uint8_t> scratch(buffer_size); // Scratch buffer for in-place encryption.
-  GenerateRandomData(original.data(), buffer_size);
-  memcpy(scratch.data(), original.data(), buffer_size);
+  TestEncryptionDecryption(buffer_size);
+}
+
+/// Test that encryption works with arbitrary-length buffer
+TEST_F(OpenSSLUtilTest, EncryptArbitraryLength) {
+  std::uniform_int_distribution<uint64_t> dis(0, 1024 * 1024);
+  const int buffer_size = dis(rng_);
+  TestEncryptionDecryption(buffer_size);
+}
+
+/// Test integrity in GCM mode
+TEST_F(OpenSSLUtilTest, GcmIntegrity) {
+  const int buffer_size = 1024 * 1024;
+  vector<uint8_t> buffer(buffer_size);
 
   EncryptionKey key;
   key.InitializeRandom();
-  ASSERT_OK(key.Encrypt(scratch.data(), buffer_size, scratch.data()));
-  // Check that encryption did something
-  ASSERT_NE(0, memcmp(original.data(), scratch.data(), buffer_size));
-  ASSERT_OK(key.Decrypt(scratch.data(), buffer_size, scratch.data()));
-  // Check that we get the original data back.
-  ASSERT_EQ(0, memcmp(original.data(), scratch.data(), buffer_size));
+  key.SetCipherMode(AES_256_GCM);
+
+  // Even it has been set as GCM mode, it may fall back to other modes.
+  // Check if GCM mode is supported at runtime.
+  if (key.IsGcmMode()) {
+    GenerateRandomData(buffer.data(), buffer_size);
+    ASSERT_OK(key.Encrypt(buffer.data(), buffer_size, buffer.data()));
+
+    // tamper the data
+    ++buffer[0];
+    Status s = key.Decrypt(buffer.data(), buffer_size, buffer.data());
+    EXPECT_STR_CONTAINS(s.GetDetail(), "EVP_DecryptFinal");
+  }
 }
 
 /// Test basic integrity hash functionality.

http://git-wip-us.apache.org/repos/asf/impala/blob/c53b8ad0/be/src/util/openssl-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/openssl-util.cc b/be/src/util/openssl-util.cc
index 69dc676..83cd8fd 100644
--- a/be/src/util/openssl-util.cc
+++ b/be/src/util/openssl-util.cc
@@ -20,6 +20,7 @@
 #include <limits.h>
 #include <sstream>
 
+#include <glog/logging.h>
 #include <openssl/err.h>
 #include <openssl/evp.h>
 #include <openssl/rand.h>
@@ -30,12 +31,32 @@
 #include "gutil/strings/substitute.h"
 
 #include "common/names.h"
+#include "cpu-info.h"
 
 DECLARE_string(ssl_client_ca_certificate);
 DECLARE_string(ssl_server_certificate);
 DECLARE_string(ssl_private_key);
 DECLARE_string(ssl_cipher_list);
 
+/// OpenSSL 1.0.1d
+#define OPENSSL_VERSION_1_0_1D 0x1000104fL
+
+/// If not defined at compile time, define them manually
+/// see: openssl/evp.h
+#ifndef EVP_CIPH_GCM_MODE
+#define EVP_CTRL_GCM_SET_IVLEN 0x9
+#define EVP_CTRL_GCM_GET_TAG 0x10
+#define EVP_CTRL_GCM_SET_TAG 0x11
+#endif
+
+extern "C" {
+ATTRIBUTE_WEAK
+const EVP_CIPHER* EVP_aes_256_ctr();
+
+ATTRIBUTE_WEAK
+const EVP_CIPHER* EVP_aes_256_gcm();
+}
+
 namespace impala {
 
 // Counter to track the number of encryption keys generated. Incremented before each key
@@ -107,19 +128,20 @@ void EncryptionKey::InitializeRandom() {
   }
   RAND_bytes(key_, sizeof(key_));
   RAND_bytes(iv_, sizeof(iv_));
+  memset(gcm_tag_, 0, sizeof(gcm_tag_));
   initialized_ = true;
 }
 
-Status EncryptionKey::Encrypt(const uint8_t* data, int64_t len, uint8_t* out) const {
+Status EncryptionKey::Encrypt(const uint8_t* data, int64_t len, uint8_t* out) {
   return EncryptInternal(true, data, len, out);
 }
 
-Status EncryptionKey::Decrypt(const uint8_t* data, int64_t len, uint8_t* out) const {
+Status EncryptionKey::Decrypt(const uint8_t* data, int64_t len, uint8_t* out) {
   return EncryptInternal(false, data, len, out);
 }
 
 Status EncryptionKey::EncryptInternal(
-    bool encrypt, const uint8_t* data, int64_t len, uint8_t* out) const {
+    bool encrypt, const uint8_t* data, int64_t len, uint8_t* out) {
   DCHECK(initialized_);
   DCHECK_GE(len, 0);
   // Create and initialize the context for encryption
@@ -127,6 +149,10 @@ Status EncryptionKey::EncryptInternal(
   EVP_CIPHER_CTX_init(&ctx);
   EVP_CIPHER_CTX_set_padding(&ctx, 0);
 
+  if (IsGcmMode()) {
+    EVP_CIPHER_CTX_ctrl(&ctx, EVP_CTRL_GCM_SET_IVLEN, AES_BLOCK_SIZE, NULL);
+  }
+
   // Start encryption/decryption.  We use a 256-bit AES key, and the cipher block mode
   // is either CTR or CFB(stream cipher), both of which support arbitrary length
   // ciphertexts - it doesn't have to be a multiple of 16 bytes. Additionally, CTR
@@ -157,6 +183,11 @@ Status EncryptionKey::EncryptInternal(
     offset += in_len;
   }
 
+  if (IsGcmMode() && !encrypt) {
+    // Set expected tag value
+    EVP_CIPHER_CTX_ctrl(&ctx, EVP_CTRL_GCM_SET_TAG, AES_BLOCK_SIZE, gcm_tag_);
+  }
+
   // Finalize encryption or decryption.
   int final_out_len;
   success = encrypt ? EVP_EncryptFinal_ex(&ctx, out + offset, &final_out_len) :
@@ -164,21 +195,74 @@ Status EncryptionKey::EncryptInternal(
   if (success != 1) {
     return OpenSSLErr(encrypt ? "EVP_EncryptFinal" : "EVP_DecryptFinal");
   }
-  // Again safe due to CTR/CFB with no padding
+
+  if (IsGcmMode() && encrypt) {
+    EVP_CIPHER_CTX_ctrl(&ctx, EVP_CTRL_GCM_GET_TAG, AES_BLOCK_SIZE, gcm_tag_);
+  }
+
+  // Again safe due to GCM/CTR/CFB with no padding
   DCHECK_EQ(final_out_len, 0);
   return Status::OK();
 }
 
-extern "C" {
-ATTRIBUTE_WEAK
-const EVP_CIPHER* EVP_aes_256_ctr();
-}
-
 const EVP_CIPHER* EncryptionKey::GetCipher() const {
   // use weak symbol to avoid compiling error on OpenSSL 1.0.0 environment
-  if (mode_ == AES_256_CTR && EVP_aes_256_ctr) return EVP_aes_256_ctr();
+  if (mode_ == AES_256_CTR) return EVP_aes_256_ctr();
+  if (mode_ == AES_256_GCM) return EVP_aes_256_gcm();
 
-  // otherwise, fallback to CFB mode
   return EVP_aes_256_cfb();
 }
+
+void EncryptionKey::SetCipherMode(AES_CIPHER_MODE m) {
+  mode_ = m;
+
+  if (!IsModeSupported(m)) {
+    mode_ = GetSupportedDefaultMode();
+    LOG(WARNING) << Substitute("$0 is not supported, fall back to $1.",
+        ModeToString(m), ModeToString(mode_));
+  }
+}
+
+bool EncryptionKey::IsModeSupported(AES_CIPHER_MODE m) const {
+  switch (m) {
+    case AES_256_GCM:
+      // It becomes a bit tricky for GCM mode, because GCM mode is enabled since
+      // OpenSSL 1.0.1, but the tag validation only works since 1.0.1d. We have
+      // to make sure that OpenSSL version >= 1.0.1d for GCM. So we need
+      // SSLeay(). Note that SSLeay() may return the compiling version on
+      // certain platforms if it was built against an older version(see:
+      // IMPALA-6418). In this case, it will return false, and EncryptionKey
+      // will try to fall back to CTR mode, so it is not ideal but is OK to use
+      // SSLeay() for GCM mode here since in the worst case, we will be using
+      // AES_256_CTR in a system that supports AES_256_GCM.
+      return (CpuInfo::IsSupported(CpuInfo::PCLMULQDQ)
+          && SSLeay() >= OPENSSL_VERSION_1_0_1D && EVP_aes_256_gcm);
+
+    case AES_256_CTR:
+      // If TLS1.2 is supported, then we're on a verison of OpenSSL that
+      // supports AES-256-CTR.
+      return (MaxSupportedTlsVersion() >= TLS1_2_VERSION && EVP_aes_256_ctr);
+
+    case AES_256_CFB:
+      return true;
+
+    default:
+      return false;
+  }
+}
+
+AES_CIPHER_MODE EncryptionKey::GetSupportedDefaultMode() const {
+  if (IsModeSupported(AES_256_GCM)) return AES_256_GCM;
+  if (IsModeSupported(AES_256_CTR)) return AES_256_CTR;
+  return AES_256_CFB;
+}
+
+const string EncryptionKey::ModeToString(AES_CIPHER_MODE m) const {
+  switch(m) {
+    case AES_256_GCM: return "AES-GCM";
+    case AES_256_CTR: return "AES-CTR";
+    case AES_256_CFB: return "AES-CFB";
+  }
+  return "Unknown mode";
+}
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/c53b8ad0/be/src/util/openssl-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/openssl-util.h b/be/src/util/openssl-util.h
index 7b1b28e..ef53425 100644
--- a/be/src/util/openssl-util.h
+++ b/be/src/util/openssl-util.h
@@ -60,9 +60,9 @@ bool IsExternalTlsConfigured();
 void SeedOpenSSLRNG();
 
 enum AES_CIPHER_MODE {
-  AES_256_CTR,
   AES_256_CFB,
-  AES_256_GCM // not supported now.
+  AES_256_CTR,
+  AES_256_GCM
 };
 
 /// The hash of a data buffer used for checking integrity. A SHA256 hash is used
@@ -83,43 +83,56 @@ class IntegrityHash {
 /// The key and initialization vector (IV) required to encrypt and decrypt a buffer of
 /// data. This should be regenerated for each buffer of data.
 ///
-/// We use AES with a 256-bit key and CTR/CFB cipher block mode, which gives us a stream
-/// cipher that can support arbitrary-length ciphertexts. If OpenSSL version at runtime
-/// is 1.0.1 or above, CTR mode is used, otherwise CFB mode is used. The IV is used as
+/// We use AES with a 256-bit key and GCM/CTR/CFB cipher block mode, which gives us a
+/// stream cipher that can support arbitrary-length ciphertexts. The mode is chosen
+/// depends on the OpenSSL version & the hardware support at runtime. The IV is used as
 /// an input to the cipher as the "block to supply before the first block of plaintext".
 /// This is required because all ciphers (except the weak ECB) are built such that each
 /// block depends on the output from the previous block. Since the first block doesn't
 /// have a previous block, we supply this IV. Think of it  as starting off the chain of
 /// encryption.
+///
+/// Notes for GCM:
+/// (1) GCM mode was supported since OpenSSL 1.0.1, however the tag verification
+/// in decryption was only supported since OpenSSL 1.0.1d.
+/// (2) The plaintext and the Additional Authenticated Data(AAD) are the two
+/// categories of data that GCM protects. GCM protects the authenticity of the
+/// plaintext and the AAD, and GCM also protects the confidentiality of the
+/// plaintext. The AAD itself is not required or won't change the security.
+/// In our case(Spill to Disk), we just ignore the AAD.
+
 class EncryptionKey {
  public:
-  EncryptionKey() : initialized_(false) {
-    // If TLS1.2 is supported, then we're on a verison of OpenSSL that supports
-    // AES-256-CTR.
-    mode_ = MaxSupportedTlsVersion() < TLS1_2_VERSION ? AES_256_CFB : AES_256_CTR;
-  }
-
-  /// Initialize a key for temporary use with randomly generated data. Reinitializes with
-  /// new random values if the key was already initialized. We use AES-CTR/AES-CFB mode
-  /// so key/IV pairs should not be reused. This function automatically reseeds the RNG
-  /// periodically, so callers do not need to do it.
+  EncryptionKey() : initialized_(false) { mode_ = GetSupportedDefaultMode(); }
+
+  /// Initializes a key for temporary use with randomly generated data, and clears the
+  /// tag for GCM mode. Reinitializes with new random values if the key was already
+  /// initialized. We use AES-GCM/AES-CTR/AES-CFB mode so key/IV pairs should not be
+  /// reused. This function automatically reseeds the RNG periodically, so callers do
+  /// not need to do it.
   void InitializeRandom();
 
   /// Encrypts a buffer of input data 'data' of length 'len' into an output buffer 'out'.
   /// Exactly 'len' bytes will be written to 'out'. This key must be initialized before
   /// calling. Operates in-place if 'in' == 'out', otherwise the buffers must not overlap.
-  Status Encrypt(const uint8_t* data, int64_t len, uint8_t* out) const WARN_UNUSED_RESULT;
+  /// For GCM mode, the hash tag will be kept inside(gcm_tag_ variable).
+  Status Encrypt(const uint8_t* data, int64_t len, uint8_t* out) WARN_UNUSED_RESULT;
 
   /// Decrypts a buffer of input data 'data' of length 'len' that was encrypted with this
   /// key into an output buffer 'out'. Exactly 'len' bytes will be written to 'out'.
   /// This key must be initialized before calling. Operates in-place if 'in' == 'out',
-  /// otherwise the buffers must not overlap.
-  Status Decrypt(const uint8_t* data, int64_t len, uint8_t* out) const WARN_UNUSED_RESULT;
+  /// otherwise the buffers must not overlap. For GCM mode, the hash tag, which is
+  /// computed during encryption, will be used for intgerity verification.
+  Status Decrypt(const uint8_t* data, int64_t len, uint8_t* out) WARN_UNUSED_RESULT;
 
   /// Specify a cipher mode. Currently used only for testing but maybe in future we
   /// can provide a configuration option for the end user who can choose a preferred
   /// mode(GCM, CTR, CFB...) based on their software/hardware environment.
-  void SetCipherMode(AES_CIPHER_MODE m) { mode_ = m; }
+  /// If not supported, fall back to the supported mode at runtime.
+  void SetCipherMode(AES_CIPHER_MODE m);
+
+  /// If is GCM mode at runtime
+  bool IsGcmMode() const { return mode_ == AES_256_GCM; }
 
  private:
   /// Helper method that encrypts/decrypts if 'encrypt' is true/false respectively.
@@ -128,13 +141,25 @@ class EncryptionKey {
   /// This key must be initialized before calling. Operates in-place if 'in' == 'out',
   /// otherwise the buffers must not overlap.
   Status EncryptInternal(bool encrypt, const uint8_t* data, int64_t len,
-      uint8_t* out) const WARN_UNUSED_RESULT;
+      uint8_t* out) WARN_UNUSED_RESULT;
+
+  /// Check if mode m is supported at runtime
+  bool IsModeSupported(AES_CIPHER_MODE m) const;
+
+  /// Returns the a default mode which is supported at runtime. If GCM mode
+  /// is supported, return AES_256_GCM as the default. If GCM is not supported,
+  /// but CTR is still supported, return AES_256_CTR. When both GCM and
+  /// CTR modes are not supported, return AES_256_CFB.
+  AES_CIPHER_MODE GetSupportedDefaultMode() const;
+
+  /// Converts mode type to string.
+  const string ModeToString(AES_CIPHER_MODE m) const;
 
   /// Track whether this key has been initialized, to avoid accidentally using
   /// uninitialized keys.
   bool initialized_;
 
-  /// return a EVP_CIPHER according to cipher mode at runtime
+  /// Returns a EVP_CIPHER according to cipher mode at runtime
   const EVP_CIPHER* GetCipher() const;
 
   /// An AES 256-bit key.
@@ -143,6 +168,9 @@ class EncryptionKey {
   /// An initialization vector to feed as the first block to AES.
   uint8_t iv_[AES_BLOCK_SIZE];
 
+  /// Tag for GCM mode
+  uint8_t gcm_tag_[AES_BLOCK_SIZE];
+
   /// Cipher Mode
   AES_CIPHER_MODE mode_;
 };


[4/5] impala git commit: IMPALA-6477: Disable rpc-mgr-kerberized-test

Posted by ta...@apache.org.
IMPALA-6477: Disable rpc-mgr-kerberized-test

This test is breaking builds on CentOS 6.4. Disable it until we
find out the root cause.

Change-Id: I5654d8c41b130544a8f9bc38f69433fcc171eca0
Reviewed-on: http://gerrit.cloudera.org:8080/9256
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/915a6587
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/915a6587
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/915a6587

Branch: refs/heads/2.x
Commit: 915a6587abbd112415d89e5ca9955618969ea046
Parents: cdbacb1
Author: Sailesh Mukil <sa...@cloudera.com>
Authored: Thu Feb 8 09:15:48 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Feb 9 17:10:16 2018 +0000

----------------------------------------------------------------------
 be/src/rpc/rpc-mgr-kerberized-test.cc | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/915a6587/be/src/rpc/rpc-mgr-kerberized-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-kerberized-test.cc b/be/src/rpc/rpc-mgr-kerberized-test.cc
index 57ed3eb..08c1971 100644
--- a/be/src/rpc/rpc-mgr-kerberized-test.cc
+++ b/be/src/rpc/rpc-mgr-kerberized-test.cc
@@ -51,10 +51,11 @@ class RpcMgrKerberizedTest :
   boost::scoped_ptr<MiniKdcWrapper> kdc_wrapper_;
 };
 
-INSTANTIATE_TEST_CASE_P(KerberosOnAndOff,
+// TODO: IMPALA-6477: This test breaks on CentOS 6.4. Re-enable after a fix.
+/*INSTANTIATE_TEST_CASE_P(KerberosOnAndOff,
                         RpcMgrKerberizedTest,
                         ::testing::Values(USE_KUDU_KERBEROS,
-                                          USE_IMPALA_KERBEROS));
+                                          USE_IMPALA_KERBEROS));*/
 
 TEST_P(RpcMgrKerberizedTest, MultipleServicesTls) {
   // TODO: We're starting a seperate RpcMgr here instead of configuring