You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by cs...@apache.org on 2023/09/11 14:14:07 UTC

[impala] branch master updated: IMPALA-12430: Skip compression when sending row batches within same process

This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new fb2d2b276 IMPALA-12430: Skip compression when sending row batches within same process
fb2d2b276 is described below

commit fb2d2b27641a95f51b6789639fab73b60abd7bc5
Author: Csaba Ringhofer <cs...@cloudera.com>
AuthorDate: Thu Sep 7 15:08:59 2023 +0200

    IMPALA-12430: Skip compression when sending row batches within same process
    
    LZ4 compression doesn't seem useful when the RowBatch is sent to a
    fragment instance within the same process instead of a remote host.
    
    After this change KrpcDataStreamSender skips compression for channels
    where the destination is in the same process.
    
    Other changes:
    - OutboundRowBatch is moved to a separate file to make the commonly
      included row-batch.h lighter.
    - TestObservability.test_global_exchange_counters had to be changed
      as skipping compression changed metric ExchangeScanRatio. Also added
      a sleep to the test query because it was flaky on my machine (it
      doesn't seem flaky in jenkins runs, probably my CPU is faster).
    
    See the Jira for more details on tasks that could be skipped in
    intra process RowBatch transfer. From these compression is both
    the most expensive and easiest to avoid.
    
    Note that it may also make sense to skip compression if the target
    is not the in same process but resides on the same host. This setup is
    not typical in production environment AFAIK and it would complicate
    testing compression as impalad processes often run on the
    same host during tests. For these reasons it seems better to only
    implement this if both the host and port are the same.
    
    TPCH benchmark shows significant improvement but it uses only 3
    impalad processes so 1/3 of exchanges are affected - in bigger
    clusters the change should be much smaller.
    +----------+-----------------------+---------+------------+------------+----------------+
    | Workload | File Format           | Avg (s) | Delta(Avg) | GeoMean(s) | Delta(GeoMean) |
    +----------+-----------------------+---------+------------+------------+----------------+
    | TPCH(42) | parquet / none / none | 3.59    | -4.95%     | 2.37       | -2.51%         |
    +----------+-----------------------+---------+------------+------------+----------------+
    
    Change-Id: I7ea23fd1f0f10f72f3dbd8594f3def3ee190230a
    Reviewed-on: http://gerrit.cloudera.org:8080/20462
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Daniel Becker <da...@cloudera.com>
---
 be/src/benchmarks/row-batch-serialize-benchmark.cc |  1 +
 be/src/runtime/krpc-data-stream-sender.cc          | 28 +++++--
 be/src/runtime/krpc-data-stream-sender.h           |  3 +-
 be/src/runtime/outbound-row-batch.h                | 91 ++++++++++++++++++++++
 be/src/runtime/row-batch-serialize-test.cc         |  1 +
 be/src/runtime/row-batch.cc                        |  3 +-
 be/src/runtime/row-batch.h                         | 56 +------------
 tests/query_test/test_observability.py             | 10 ++-
 8 files changed, 128 insertions(+), 65 deletions(-)

diff --git a/be/src/benchmarks/row-batch-serialize-benchmark.cc b/be/src/benchmarks/row-batch-serialize-benchmark.cc
index 40421ce7f..d772855ad 100644
--- a/be/src/benchmarks/row-batch-serialize-benchmark.cc
+++ b/be/src/benchmarks/row-batch-serialize-benchmark.cc
@@ -21,6 +21,7 @@
 
 #include "common/init.h"
 #include "runtime/mem-tracker.h"
+#include "runtime/outbound-row-batch.h"
 #include "runtime/raw-value.h"
 #include "runtime/string-value.h"
 #include "runtime/tuple-row.h"
diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc
index b8759008c..499dba0e9 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -149,13 +149,15 @@ class KrpcDataStreamSender::Channel : public CacheLineAligned {
   // AddRow() and not sent directly via SendBatch().
   Channel(KrpcDataStreamSender* parent, const RowDescriptor* row_desc,
       const std::string& hostname, const NetworkAddressPB& destination,
-      const UniqueIdPB& fragment_instance_id, PlanNodeId dest_node_id, int buffer_size)
+      const UniqueIdPB& fragment_instance_id, PlanNodeId dest_node_id, int buffer_size,
+      bool is_local)
     : parent_(parent),
       row_desc_(row_desc),
       hostname_(hostname),
       address_(destination),
       fragment_instance_id_(fragment_instance_id),
-      dest_node_id_(dest_node_id) {
+      dest_node_id_(dest_node_id),
+      is_local_(is_local) {
     DCHECK(IsResolvedAddress(address_));
   }
 
@@ -206,6 +208,8 @@ class KrpcDataStreamSender::Channel : public CacheLineAligned {
   // The type for a RPC worker function.
   typedef boost::function<Status()> DoRpcFn;
 
+  bool IsLocal() const { return is_local_; }
+
  private:
   // The parent data stream sender owning this channel. Not owned.
   KrpcDataStreamSender* parent_;
@@ -220,6 +224,9 @@ class KrpcDataStreamSender::Channel : public CacheLineAligned {
   const UniqueIdPB fragment_instance_id_;
   const PlanNodeId dest_node_id_;
 
+  // True if the target fragment instance runs within the same process.
+  const bool is_local_;
+
   // The row batch for accumulating rows copied from AddRow().
   // Only used if the partitioning scheme is "KUDU" or "HASH_PARTITIONED".
   scoped_ptr<RowBatch> batch_;
@@ -378,7 +385,7 @@ Status KrpcDataStreamSender::Channel::Init(
 
   // Init outbound_batches_.
   for (int i = 0; i < NUM_OUTBOUND_BATCHES; ++i) {
-    outbound_batches_.emplace_back(allocator);
+    outbound_batches_.emplace_back(allocator, is_local_);
   }
   return Status::OK();
 }
@@ -738,10 +745,14 @@ KrpcDataStreamSender::KrpcDataStreamSender(TDataSinkId sink_id, int sender_id,
       || sink.output_partition.type == TPartitionType::RANDOM
       || sink.output_partition.type == TPartitionType::KUDU);
 
+  string process_address =
+      NetworkAddressPBToString(ExecEnv::GetInstance()->krpc_address());
   for (const auto& destination : destinations) {
+    bool is_local =
+        process_address == NetworkAddressPBToString(destination.krpc_backend());
     channels_.emplace_back(new Channel(this, row_desc_, destination.address().hostname(),
         destination.krpc_backend(), destination.fragment_instance_id(), sink.dest_node_id,
-        per_channel_buffer_size));
+        per_channel_buffer_size, is_local));
   }
 
   if (partition_type_ == TPartitionType::UNPARTITIONED
@@ -786,9 +797,14 @@ Status KrpcDataStreamSender::Prepare(
       new MemTracker(-1, "RowBatchSerialization", mem_tracker_.get()));
   char_mem_tracker_allocator_.reset(
       new CharMemTrackerAllocator(outbound_rb_mem_tracker_));
-
+  string process_address =
+       NetworkAddressPBToString(ExecEnv::GetInstance()->krpc_address());
   for (int i = 0; i < NUM_OUTBOUND_BATCHES; ++i) {
-    outbound_batches_.emplace_back(char_mem_tracker_allocator_);
+    // Only skip compression if there is a single channel and destination is in the same
+    // process. TODO: could be optimized to send the uncompressed buffer to the local
+    // targets to avoid decompression cost at the receiver.
+    bool is_local = channels_.size() == 1 && channels_[0]->IsLocal();
+    outbound_batches_.emplace_back(char_mem_tracker_allocator_, is_local);
   }
 
   for (int i = 0; i < channels_.size(); ++i) {
diff --git a/be/src/runtime/krpc-data-stream-sender.h b/be/src/runtime/krpc-data-stream-sender.h
index 21074fa66..4308827f8 100644
--- a/be/src/runtime/krpc-data-stream-sender.h
+++ b/be/src/runtime/krpc-data-stream-sender.h
@@ -29,13 +29,14 @@
 #include "exec/data-sink.h"
 #include "exprs/scalar-expr.h"
 #include "runtime/mem-tracker.h"
-#include "runtime/row-batch.h"
+#include "runtime/outbound-row-batch.h"
 #include "util/runtime-profile.h"
 
 namespace impala {
 
 class KrpcDataStreamSender;
 class MemTracker;
+class RowBatch;
 class RowDescriptor;
 class TDataStreamSink;
 class TNetworkAddress;
diff --git a/be/src/runtime/outbound-row-batch.h b/be/src/runtime/outbound-row-batch.h
new file mode 100644
index 000000000..4411c6da4
--- /dev/null
+++ b/be/src/runtime/outbound-row-batch.h
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstring>
+#include <vector>
+
+#include "gen-cpp/row_batch.pb.h"
+#include "kudu/util/slice.h"
+#include "runtime/mem-tracker.h"
+
+namespace impala {
+
+template <typename K, typename V> class FixedSizeHashTable;
+class MemTracker;
+class RowBatchSerializeTest;
+class RuntimeState;
+
+/// A KRPC outbound row batch which contains the serialized row batch header and buffers
+/// for holding the tuple offsets and tuple data.
+class OutboundRowBatch {
+ public:
+  OutboundRowBatch(std::shared_ptr<CharMemTrackerAllocator> allocator,
+      bool skip_compression=false)
+    : tuple_data_(*allocator.get()), compression_scratch_(*allocator.get()),
+      skip_compression_(skip_compression) {}
+
+  const RowBatchHeaderPB* header() const { return &header_; }
+
+  /// Returns the serialized tuple offsets' vector as a kudu::Slice.
+  /// The tuple offsets vector is sent as KRPC sidecar.
+  kudu::Slice TupleOffsetsAsSlice() const {
+    return kudu::Slice(
+        const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(tuple_offsets_.data())),
+        tuple_offsets_.size() * sizeof(tuple_offsets_[0]));
+  }
+
+  /// Returns the serialized tuple data's buffer as a kudu::Slice.
+  /// The tuple data is sent as KRPC sidecar.
+  kudu::Slice TupleDataAsSlice() const {
+    return kudu::Slice(
+        const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(tuple_data_.data())),
+        tuple_data_.length());
+  }
+
+  /// Returns true if the header has been initialized and ready to be sent.
+  /// This entails setting some fields initialized in RowBatch::Serialize().
+  bool IsInitialized() const {
+     return header_.has_num_rows() && header_.has_uncompressed_size() &&
+         header_.has_compression_type();
+  }
+
+ private:
+  friend class RowBatch;
+  friend class RowBatchSerializeBaseline;
+
+  /// The serialized header which contains the meta-data of the row batch such as the
+  /// number of rows and compression scheme used etc.
+  RowBatchHeaderPB header_;
+
+  /// Contains offsets into 'tuple_data_' of all tuples in a row batch. -1 refers to
+  /// a NULL tuple.
+  vector<int32_t> tuple_offsets_;
+
+  /// Contains the actual data of all the tuples. The data could be compressed.
+  TrackedString tuple_data_;
+
+  /// Contains the compression scratch for the compressed data in serialization.
+  /// The compression_scratch_ will be swapped with tuple_data_ if the compressed data
+  /// is shorter.
+  TrackedString compression_scratch_;
+
+  bool skip_compression_;
+};
+
+}
diff --git a/be/src/runtime/row-batch-serialize-test.cc b/be/src/runtime/row-batch-serialize-test.cc
index bfd88c3cf..1cc84adc0 100644
--- a/be/src/runtime/row-batch-serialize-test.cc
+++ b/be/src/runtime/row-batch-serialize-test.cc
@@ -22,6 +22,7 @@
 #include "runtime/collection-value.h"
 #include "runtime/collection-value-builder.h"
 #include "runtime/mem-tracker.h"
+#include "runtime/outbound-row-batch.h"
 #include "runtime/raw-value.h"
 #include "runtime/raw-value.inline.h"
 #include "runtime/row-batch.h"
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index c7fc4c4f3..144734dfe 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -23,6 +23,7 @@
 
 #include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
+#include "runtime/outbound-row-batch.h"
 #include "runtime/string-value.h"
 #include "runtime/tuple-row.h"
 #include "util/compress.h"
@@ -273,7 +274,7 @@ Status RowBatch::Serialize(DedupMap* distinct_tuples, OutboundRowBatch* output_b
 
   *is_compressed = false;
 
-  if (size > 0) {
+  if (size > 0 && !output_batch->skip_compression_) {
     // Try compressing tuple_data to compression_scratch_, swap if compressed data is
     // smaller
     Lz4Compressor compressor(nullptr, false);
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 3620b36e5..80e9ac054 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -27,7 +27,6 @@
 #include "common/compiler-util.h"
 #include "common/logging.h"
 #include "gen-cpp/row_batch.pb.h"
-#include "kudu/util/slice.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/descriptors.h"
 #include "runtime/mem-pool.h"
@@ -41,65 +40,14 @@ namespace impala {
 
 template <typename K, typename V> class FixedSizeHashTable;
 class MemTracker;
+class OutboundRowBatch;
+class RowBatchHeaderPB;
 class RowBatchSerializeTest;
 class RuntimeState;
 class Tuple;
 class TupleRow;
 class TupleDescriptor;
 
-/// A KRPC outbound row batch which contains the serialized row batch header and buffers
-/// for holding the tuple offsets and tuple data.
-class OutboundRowBatch {
- public:
-  OutboundRowBatch(std::shared_ptr<CharMemTrackerAllocator> allocator)
-    : tuple_data_(*allocator.get()), compression_scratch_(*allocator.get()) {}
-
-  const RowBatchHeaderPB* header() const { return &header_; }
-
-  /// Returns the serialized tuple offsets' vector as a kudu::Slice.
-  /// The tuple offsets vector is sent as KRPC sidecar.
-  kudu::Slice TupleOffsetsAsSlice() const {
-    return kudu::Slice(
-        const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(tuple_offsets_.data())),
-        tuple_offsets_.size() * sizeof(tuple_offsets_[0]));
-  }
-
-  /// Returns the serialized tuple data's buffer as a kudu::Slice.
-  /// The tuple data is sent as KRPC sidecar.
-  kudu::Slice TupleDataAsSlice() const {
-    return kudu::Slice(
-        const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(tuple_data_.data())),
-        tuple_data_.length());
-  }
-
-  /// Returns true if the header has been initialized and ready to be sent.
-  /// This entails setting some fields initialized in RowBatch::Serialize().
-  bool IsInitialized() const {
-     return header_.has_num_rows() && header_.has_uncompressed_size() &&
-         header_.has_compression_type();
-  }
-
- private:
-  friend class RowBatch;
-  friend class RowBatchSerializeBaseline;
-
-  /// The serialized header which contains the meta-data of the row batch such as the
-  /// number of rows and compression scheme used etc.
-  RowBatchHeaderPB header_;
-
-  /// Contains offsets into 'tuple_data_' of all tuples in a row batch. -1 refers to
-  /// a NULL tuple.
-  vector<int32_t> tuple_offsets_;
-
-  /// Contains the actual data of all the tuples. The data could be compressed.
-  TrackedString tuple_data_;
-
-  /// Contains the compression scratch for the compressed data in serialization.
-  /// The compression_scratch_ will be swapped with tuple_data_ if the compressed data
-  /// is shorter.
-  TrackedString compression_scratch_;
-};
-
 /// A RowBatch encapsulates a batch of rows, each composed of a number of tuples.
 /// The maximum number of rows is fixed at the time of construction.
 /// The row batch can reference various types of memory.
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index d408b10ee..9ea951aa5 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -549,15 +549,19 @@ class TestObservability(ImpalaTestSuite):
   @SkipIfNotHdfsMinicluster.tuned_for_minicluster
   def test_global_exchange_counters(self):
     """Test that global exchange counters are set correctly."""
-    query = """select count(*) from tpch_parquet.orders o inner join tpch_parquet.lineitem
-        l on o.o_orderkey = l.l_orderkey group by o.o_clerk limit 10"""
+
+    # periodic_counter_update_period_ms is 500 ms by default. Use sleep(50) with limit 10
+    # to ensure that there is at least one sample in sampling counters.
+    query = """select count(*), sleep(50) from tpch_parquet.orders o
+        inner join tpch_parquet.lineitem l on o.o_orderkey = l.l_orderkey
+        group by o.o_clerk limit 10"""
     profile = self.execute_query(query).runtime_profile
 
     # TimeSeriesCounter should be prefixed with a hyphen.
     assert "  MemoryUsage" not in profile
     assert "- MemoryUsage" in profile
 
-    assert "ExchangeScanRatio: 3.19" in profile
+    assert "ExchangeScanRatio: 4.63" in profile
 
     keys = ["TotalBytesSent", "TotalScanBytesSent", "TotalInnerBytesSent"]
     counters = defaultdict(int)