You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2020/03/27 17:22:43 UTC

[impala] 01/02: IMPALA-8005: Randomize partitioning exchanges.

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit df6196e064bc7453bee8c7e644bb591391ee3ce2
Author: Anurag Mantripragada <an...@cloudera.com>
AuthorDate: Wed Mar 18 12:45:45 2020 -0700

    IMPALA-8005: Randomize partitioning exchanges.
    
    Currently, we use the same hash seed for partitioning exchanges at
    the sender. For a table with skew in distribution in the shuffling
    keys, multiple queries using the same shuffling keys for exchanges
    will end up hashing to the same destination fragments running on
    a particular host and potentially overloading that host.
    
    This patch seeds the hash with query id. This will ensure that
    the partitioning exchanges do not always hash to the
    same destination with same shuffling keys.
    
    Testing:
    Added a test to data-stream-test to verify the data values at
    destination are different for different queries.
    
    Change-Id: I1936e6cc3e8d66420a5a9301f49221ca38f3e468
    Reviewed-on: http://gerrit.cloudera.org:8080/15497
    Reviewed-by: Joe McDonnell <jo...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/data-stream-test.cc        | 86 ++++++++++++++++++++++++++++---
 be/src/runtime/krpc-data-stream-sender.cc |  7 ++-
 be/src/runtime/krpc-data-stream-sender.h  | 10 +++-
 3 files changed, 93 insertions(+), 10 deletions(-)

diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 863db94..3ac64bd 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -50,6 +50,7 @@
 #include "util/parse-util.h"
 #include "util/test-info.h"
 #include "util/tuple-row-compare.h"
+#include "util/uid-util.h"
 #include "gen-cpp/data_stream_service.pb.h"
 #include "gen-cpp/Types_types.h"
 #include "gen-cpp/Descriptors_types.h"
@@ -61,6 +62,8 @@
 
 #include "common/names.h"
 
+using boost::uuids::random_generator;
+using boost::uuids::uuid;
 using namespace impala;
 using namespace apache::thrift;
 using namespace apache::thrift::protocol;
@@ -478,7 +481,7 @@ class DataStreamTest : public testing::Test {
           // hash-partitioned streams send values to the right partition
           int64_t value = *j;
           uint64_t hash_val = RawValue::GetHashValueFastHash(&value, TYPE_BIGINT,
-              KrpcDataStreamSender::EXCHANGE_HASH_SEED);
+              GetExchangeHashSeed(runtime_state_->query_id()));
           EXPECT_EQ(hash_val % receiver_info_.size(), info->receiver_num);
         }
       }
@@ -497,6 +500,45 @@ class DataStreamTest : public testing::Test {
     }
   }
 
+  // Returns a map of reciever to all it's data values.
+  unordered_map<int, multiset<int64_t>> GetHashPartitionedReceiversDataMap(
+      int num_receivers, bool reset_hash_seed) {
+    int num_senders = 1;
+    int buffer_size = 1024;
+    bool merging = false;
+    unordered_map<int, multiset<int64_t>> receiver_data_map;
+    Reset();
+    for (int i = 0; i < num_receivers; ++i) {
+      StartReceiver(TPartitionType::HASH_PARTITIONED, num_senders, i,
+          buffer_size, merging);
+    }
+    for (int i = 0; i < num_senders; ++i) {
+      StartSender(TPartitionType::HASH_PARTITIONED, buffer_size,
+          reset_hash_seed);
+    }
+    JoinSenders();
+    CheckSenders();
+    JoinReceivers();
+    receiver_data_map.clear();
+
+    for (int i = 0; i < receiver_info_.size(); i++) {
+      // Store a map of receiver and list of it's data values.
+      ReceiverInfo* info = receiver_info_[i].get();
+      multiset<int64_t> data_set;
+      for (multiset<int64_t>::iterator j = info->data_values.begin();
+           j != info->data_values.end(); ++j) {
+        data_set.insert(*j);
+      }
+      receiver_data_map[info->receiver_num] = data_set;
+    }
+    return receiver_data_map;
+  }
+
+  /// Returns a hash seed with query_id.
+  uint64_t GetExchangeHashSeed(TUniqueId query_id) {
+    return 0x66bd68df22c3ef37 ^ query_id.hi;
+  }
+
   void CheckSenders() {
     for (int i = 0; i < sender_info_.size(); ++i) {
       EXPECT_OK(sender_info_[i]->status);
@@ -520,13 +562,13 @@ class DataStreamTest : public testing::Test {
   }
 
   void StartSender(TPartitionType::type partition_type = TPartitionType::UNPARTITIONED,
-                   int channel_buffer_size = 1024) {
+      int channel_buffer_size = 1024, bool reset_hash_seed = false) {
     VLOG_QUERY << "start sender";
     int num_senders = sender_info_.size();
     sender_info_.emplace_back(make_unique<SenderInfo>());
     sender_info_.back()->thread_handle.reset(
         new thread(&DataStreamTest::Sender, this, num_senders, channel_buffer_size,
-            partition_type, sender_info_[num_senders].get()));
+            partition_type, sender_info_[num_senders].get(), reset_hash_seed));
   }
 
   void JoinSenders() {
@@ -537,7 +579,7 @@ class DataStreamTest : public testing::Test {
   }
 
   void Sender(int sender_num, int channel_buffer_size,
-      TPartitionType::type partition_type, SenderInfo* info) {
+      TPartitionType::type partition_type, SenderInfo* info, bool reset_hash_seed) {
     RuntimeState state(TQueryCtx(), exec_env_.get(), desc_tbl_);
     VLOG_QUERY << "create sender " << sender_num;
     const TDataSink& sink = GetSink(partition_type);
@@ -548,8 +590,17 @@ class DataStreamTest : public testing::Test {
     // according to the 'is_thrift' option.
     scoped_ptr<DataSink> sender;
 
-    sender.reset(new KrpcDataStreamSender(-1, sender_num,
-        *(static_cast<const KrpcDataStreamSenderConfig*>(data_sink)),
+    KrpcDataStreamSenderConfig& config =
+        *(static_cast<KrpcDataStreamSenderConfig*>(data_sink));
+
+    // Reset the hash seed with a new query id. Useful for testing hash exchanges are
+    // random.
+    if (reset_hash_seed) {
+      config.exchange_hash_seed_ =
+          GetExchangeHashSeed(UuidToQueryId(random_generator()()));
+    }
+
+    sender.reset(new KrpcDataStreamSender(-1, sender_num, config,
         data_sink->tsink_->stream_sink, dest_, channel_buffer_size, &state));
     EXPECT_OK(sender->Prepare(&state, &tracker_));
     EXPECT_OK(sender->Open(&state));
@@ -675,6 +726,29 @@ TEST_F(DataStreamTest, BasicTest) {
   }
 }
 
+// Test streams with different query ids should hash to different destinations.
+TEST_F(DataStreamTest, HashPartitionTest) {
+  bool result = false;
+  int num_receivers = 4;
+
+  unordered_map<int, multiset<int64_t>> receiver_data_map_1 =
+      GetHashPartitionedReceiversDataMap(num_receivers, false);
+
+  unordered_map<int, multiset<int64_t>> receiver_data_map_2 =
+      GetHashPartitionedReceiversDataMap(num_receivers, true);
+
+  // Check the sizes of the receiver data values in each receiver is different.
+  for (int i = 0; i < num_receivers; ++i) {
+    // Compare the data values in the recievers for the two queries. Verify the values
+    // don't match for at least one reciever.
+    if (receiver_data_map_1[i] != receiver_data_map_2[i]) {
+      result = true;
+      break;
+    }
+  }
+  ASSERT_EQ(result, true);
+}
+
 // This test is to exercise a previously present deadlock path which is now fixed, to
 // ensure that the deadlock does not happen anymore. It does this by doing the following:
 // This test starts multiple senders to send to the same receiver. It makes sure that
diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc
index d211986..4865757 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -85,6 +85,8 @@ Status KrpcDataStreamSenderConfig::Init(
     RETURN_IF_ERROR(
         ScalarExpr::Create(tsink_->stream_sink.output_partition.partition_exprs,
             *input_row_desc_, state, &partition_exprs_));
+    exchange_hash_seed_ =
+        KrpcDataStreamSender::EXCHANGE_HASH_SEED_CONST ^ state->query_id().hi;
   }
   return Status::OK();
 }
@@ -715,6 +717,7 @@ KrpcDataStreamSender::KrpcDataStreamSender(TDataSinkId sink_id, int sender_id,
     partition_exprs_(sink_config.partition_exprs_),
     dest_node_id_(sink.dest_node_id),
     next_unknown_partition_(0),
+    exchange_hash_seed_(sink_config.exchange_hash_seed_),
     hash_and_add_rows_fn_(sink_config.hash_and_add_rows_fn_) {
   DCHECK_GT(destinations.size(), 0);
   DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED
@@ -831,7 +834,7 @@ Status KrpcDataStreamSenderConfig::CodegenHashRow(
 
   // Store the initial seed to hash_val
   llvm::Value* hash_val =
-      codegen->GetI64Constant(KrpcDataStreamSender::EXCHANGE_HASH_SEED);
+      codegen->GetI64Constant(exchange_hash_seed_);
 
   // Unroll the loop and codegen each of the partition expressions
   for (int i = 0; i < partition_exprs_.size(); ++i) {
@@ -973,7 +976,7 @@ Status KrpcDataStreamSender::AddRowToChannel(const int channel_id, TupleRow* row
 }
 
 uint64_t KrpcDataStreamSender::HashRow(TupleRow* row) {
-  uint64_t hash_val = EXCHANGE_HASH_SEED;
+  uint64_t hash_val = exchange_hash_seed_;
   for (ScalarExprEvaluator* eval : partition_expr_evals_) {
     void* partition_val = eval->GetValue(row);
     // We can't use the crc hash function here because it does not result in
diff --git a/be/src/runtime/krpc-data-stream-sender.h b/be/src/runtime/krpc-data-stream-sender.h
index e3801e6..e74e82f 100644
--- a/be/src/runtime/krpc-data-stream-sender.h
+++ b/be/src/runtime/krpc-data-stream-sender.h
@@ -56,6 +56,9 @@ class KrpcDataStreamSenderConfig : public DataSinkConfig {
   /// per-row partition values for shuffling exchange;
   std::vector<ScalarExpr*> partition_exprs_;
 
+  /// Hash seed used for exchanges. Query id will be used to seed the hash function.
+  uint64_t exchange_hash_seed_;
+
   /// Type and pointer for the codegen'd KrpcDataStreamSender::HashAndAddRows()
   /// function. NULL if codegen is disabled or failed.
   typedef Status (*HashAndAddRowsFn)(KrpcDataStreamSender*, RowBatch* row);
@@ -139,8 +142,8 @@ class KrpcDataStreamSender : public DataSink {
   /// KrpcDataStreamSender::HashRow() symbol. Used for call-site replacement.
   static const char* HASH_ROW_SYMBOL;
 
-  /// An arbitrary hash seed used for exchanges.
-  static constexpr uint64_t EXCHANGE_HASH_SEED = 0x66bd68df22c3ef37;
+  /// An arbitrary constant used to seed the hash.
+  static constexpr uint64_t EXCHANGE_HASH_SEED_CONST = 0x66bd68df22c3ef37;
 
   static const char* LLVM_CLASS_NAME;
 
@@ -262,6 +265,9 @@ class KrpcDataStreamSender : public DataSink {
   /// or when errors are encountered.
   int next_unknown_partition_;
 
+  /// Hash seed used for exchanges. Query id will be used to seed the hash function.
+  uint64_t exchange_hash_seed_;
+
   /// Pointer for the codegen'd HashAndAddRows() function.
   /// NULL if codegen is disabled or failed.
   const KrpcDataStreamSenderConfig::HashAndAddRowsFn& hash_and_add_rows_fn_;