You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/21 18:29:31 UTC

[GitHub] [arrow] lidavidm commented on a diff in pull request #12228: ARROW-15410: [C++][Datasets] Improve memory usage of datasets API when scanning parquet

lidavidm commented on code in PR #12228:
URL: https://github.com/apache/arrow/pull/12228#discussion_r855431141


##########
cpp/src/arrow/compute/exec/hash_join_node.cc:
##########
@@ -593,9 +593,13 @@ class HashJoinNode : public ExecNode {
     return Status::OK();
   }
 
-  void PauseProducing(ExecNode* output) override { EVENT(span_, "PauseProducing"); }
+  void PauseProducing(ExecNode* output, int32_t counter) override {
+    // TODO(ARROW-16246)

Review Comment:
   Should we keep the EVENT calls at least?



##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -129,17 +129,86 @@ class ARROW_EXPORT AggregateNodeOptions : public ExecNodeOptions {
   std::vector<FieldRef> keys;
 };
 
+constexpr int32_t kDefaultBackpressureHighBytes = 1 << 30;  // 1GiB
+constexpr int32_t kDefaultBackpressureLowBytes = 1 << 28;   // 256MiB
+
+class BackpressureMonitor {
+ public:
+  virtual ~BackpressureMonitor() = default;
+  virtual uint64_t bytes_in_use() const = 0;
+  virtual bool is_paused() const = 0;
+};
+
+/// \brief Options to control backpressure behavior
+struct ARROW_EXPORT BackpressureOptions {
+  /// \brief Create default options that perform no backpressure
+  BackpressureOptions() : resume_if_below(0), pause_if_above(0) {}
+  /// \brief Create options that will perform backpressure
+  ///
+  /// \param resume_if_below The producer should resume producing if the backpressure
+  ///                        queue has fewer than resume_if_below items.
+  /// \param pause_if_above The producer should pause producing if the backpressure
+  ///                       queue has more than pause_if_above items
+  BackpressureOptions(uint32_t resume_if_below, uint32_t pause_if_above)
+      : resume_if_below(resume_if_below), pause_if_above(pause_if_above) {}
+
+  static BackpressureOptions DefaultBackpressure() {
+    return BackpressureOptions(kDefaultBackpressureLowBytes,
+                               kDefaultBackpressureHighBytes);
+  }
+
+  inline bool should_apply_backpressure() const { return pause_if_above > 0; }
+
+  uint64_t resume_if_below;
+  uint64_t pause_if_above;
+};
+
 /// \brief Add a sink node which forwards to an AsyncGenerator<ExecBatch>
 ///
 /// Emitted batches will not be ordered.
 class ARROW_EXPORT SinkNodeOptions : public ExecNodeOptions {
  public:
-  explicit SinkNodeOptions(std::function<Future<util::optional<ExecBatch>>()>* generator,
-                           util::BackpressureOptions backpressure = {})
-      : generator(generator), backpressure(std::move(backpressure)) {}
-
+  explicit SinkNodeOptions(
+      std::function<Future<util::optional<ExecBatch>>()>* generator,
+      BackpressureOptions backpressure = {},
+      std::shared_ptr<BackpressureMonitor>* backpressure_monitor = NULLPTR)
+      : generator(generator),
+        backpressure(std::move(backpressure)),
+        backpressure_monitor(backpressure_monitor) {}
+
+  /// \brief A pointer to a generator of batches.
+  ///
+  /// This will be set when the node is added to the plan and should be used to consume
+  /// data from the plan.  If this function is not called frequently enough then the sink
+  /// node will start to accumulate data and may apply backpressure.
   std::function<Future<util::optional<ExecBatch>>()>* generator;
-  util::BackpressureOptions backpressure;
+  /// \brief Options to control when to apply backpressure
+  ///
+  /// This is optional, the default is to never apply backpressure.  If the plan is not
+  /// consumed quickly enough the system may eventually run out of memory.
+  BackpressureOptions backpressure;
+  /// \brief A pointer to a backpressure monitor
+  ///
+  /// This will be set when the node is added to the plan.  This can be used to inspect
+  /// the amount of data currently queued in the sink node.  This is an optional utility
+  /// and backpressure can be applied even if this is not used.
+  std::shared_ptr<BackpressureMonitor>* backpressure_monitor;
+};
+
+/// \brief Control used by a SinkNodeConsumer to pause & resume
+///
+/// Callers should ensure that they do not call Pause and Resume simultaneously and they
+/// should sequence things so that a call to Pause() is always followed by an eventual
+/// call to Resume()
+class BackpressureControl {

Review Comment:
   probably needs ARROW_EXPORT



##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -129,17 +129,86 @@ class ARROW_EXPORT AggregateNodeOptions : public ExecNodeOptions {
   std::vector<FieldRef> keys;
 };
 
+constexpr int32_t kDefaultBackpressureHighBytes = 1 << 30;  // 1GiB
+constexpr int32_t kDefaultBackpressureLowBytes = 1 << 28;   // 256MiB
+
+class BackpressureMonitor {

Review Comment:
   probably needs an ARROW_EXPORT?



##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -129,17 +129,86 @@ class ARROW_EXPORT AggregateNodeOptions : public ExecNodeOptions {
   std::vector<FieldRef> keys;
 };
 
+constexpr int32_t kDefaultBackpressureHighBytes = 1 << 30;  // 1GiB
+constexpr int32_t kDefaultBackpressureLowBytes = 1 << 28;   // 256MiB
+
+class BackpressureMonitor {
+ public:
+  virtual ~BackpressureMonitor() = default;
+  virtual uint64_t bytes_in_use() const = 0;
+  virtual bool is_paused() const = 0;
+};
+
+/// \brief Options to control backpressure behavior
+struct ARROW_EXPORT BackpressureOptions {
+  /// \brief Create default options that perform no backpressure
+  BackpressureOptions() : resume_if_below(0), pause_if_above(0) {}
+  /// \brief Create options that will perform backpressure
+  ///
+  /// \param resume_if_below The producer should resume producing if the backpressure
+  ///                        queue has fewer than resume_if_below items.
+  /// \param pause_if_above The producer should pause producing if the backpressure
+  ///                       queue has more than pause_if_above items
+  BackpressureOptions(uint32_t resume_if_below, uint32_t pause_if_above)
+      : resume_if_below(resume_if_below), pause_if_above(pause_if_above) {}
+
+  static BackpressureOptions DefaultBackpressure() {
+    return BackpressureOptions(kDefaultBackpressureLowBytes,
+                               kDefaultBackpressureHighBytes);
+  }
+
+  inline bool should_apply_backpressure() const { return pause_if_above > 0; }
+
+  uint64_t resume_if_below;
+  uint64_t pause_if_above;
+};
+
 /// \brief Add a sink node which forwards to an AsyncGenerator<ExecBatch>
 ///
 /// Emitted batches will not be ordered.
 class ARROW_EXPORT SinkNodeOptions : public ExecNodeOptions {
  public:
-  explicit SinkNodeOptions(std::function<Future<util::optional<ExecBatch>>()>* generator,
-                           util::BackpressureOptions backpressure = {})
-      : generator(generator), backpressure(std::move(backpressure)) {}
-
+  explicit SinkNodeOptions(
+      std::function<Future<util::optional<ExecBatch>>()>* generator,
+      BackpressureOptions backpressure = {},
+      std::shared_ptr<BackpressureMonitor>* backpressure_monitor = NULLPTR)

Review Comment:
   Seems weird to have ptr-to-shared-ptr, vs. just a shared_ptr that might hold nullptr.



##########
cpp/src/arrow/compute/exec/sink_node.cc:
##########
@@ -46,31 +46,81 @@ using internal::checked_cast;
 namespace compute {
 namespace {
 
+class BackpressureResevoir : public BackpressureMonitor {

Review Comment:
   nit but seems it's spelled Rese**r**voir



##########
cpp/src/arrow/compute/exec/sink_node.cc:
##########
@@ -103,6 +157,28 @@ class SinkNode : public ExecNode {
 
   Future<> finished() override { return finished_; }
 
+  void RecordBackpressureBytesUsed(const ExecBatch& batch) {
+    if (backpressure_queue_->enabled()) {
+      uint64_t bytes_used = static_cast<uint64_t>(batch.TotalBufferSize());
+      auto state_change = backpressure_queue_->RecordProduced(bytes_used);
+      if (state_change >= 0) {
+        EVENT(span_, "Backpressure applied", {{"counter", state_change}});

Review Comment:
   nit, but maybe namespace the span attribute name?



##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -129,17 +129,86 @@ class ARROW_EXPORT AggregateNodeOptions : public ExecNodeOptions {
   std::vector<FieldRef> keys;
 };
 
+constexpr int32_t kDefaultBackpressureHighBytes = 1 << 30;  // 1GiB
+constexpr int32_t kDefaultBackpressureLowBytes = 1 << 28;   // 256MiB
+
+class BackpressureMonitor {
+ public:
+  virtual ~BackpressureMonitor() = default;
+  virtual uint64_t bytes_in_use() const = 0;
+  virtual bool is_paused() const = 0;
+};
+
+/// \brief Options to control backpressure behavior
+struct ARROW_EXPORT BackpressureOptions {
+  /// \brief Create default options that perform no backpressure
+  BackpressureOptions() : resume_if_below(0), pause_if_above(0) {}
+  /// \brief Create options that will perform backpressure
+  ///
+  /// \param resume_if_below The producer should resume producing if the backpressure
+  ///                        queue has fewer than resume_if_below items.
+  /// \param pause_if_above The producer should pause producing if the backpressure
+  ///                       queue has more than pause_if_above items
+  BackpressureOptions(uint32_t resume_if_below, uint32_t pause_if_above)
+      : resume_if_below(resume_if_below), pause_if_above(pause_if_above) {}
+
+  static BackpressureOptions DefaultBackpressure() {
+    return BackpressureOptions(kDefaultBackpressureLowBytes,
+                               kDefaultBackpressureHighBytes);
+  }
+
+  inline bool should_apply_backpressure() const { return pause_if_above > 0; }
+
+  uint64_t resume_if_below;
+  uint64_t pause_if_above;
+};
+
 /// \brief Add a sink node which forwards to an AsyncGenerator<ExecBatch>
 ///
 /// Emitted batches will not be ordered.
 class ARROW_EXPORT SinkNodeOptions : public ExecNodeOptions {
  public:
-  explicit SinkNodeOptions(std::function<Future<util::optional<ExecBatch>>()>* generator,
-                           util::BackpressureOptions backpressure = {})
-      : generator(generator), backpressure(std::move(backpressure)) {}
-
+  explicit SinkNodeOptions(
+      std::function<Future<util::optional<ExecBatch>>()>* generator,
+      BackpressureOptions backpressure = {},
+      std::shared_ptr<BackpressureMonitor>* backpressure_monitor = NULLPTR)

Review Comment:
   Ah, this is because it's an out pointer?



##########
cpp/src/arrow/compute/exec/sink_node.cc:
##########
@@ -46,31 +46,81 @@ using internal::checked_cast;
 namespace compute {
 namespace {
 
+class BackpressureResevoir : public BackpressureMonitor {
+ public:
+  BackpressureResevoir(uint64_t resume_if_below, uint64_t pause_if_above)
+      : bytes_used_(0),
+        state_change_counter_(0),
+        resume_if_below_(resume_if_below),
+        pause_if_above_(pause_if_above) {}
+
+  uint64_t bytes_in_use() const override { return bytes_used_; }
+  bool is_paused() const override { return state_change_counter_ % 2 == 1; }
+  bool enabled() const { return pause_if_above_ > 0; }
+
+  int32_t RecordProduced(uint64_t num_bytes) {
+    std::lock_guard<std::mutex> lg(mutex_);
+    bool was_under = bytes_used_ <= pause_if_above_;
+    bytes_used_ += num_bytes;
+    if (was_under && bytes_used_ > pause_if_above_) {
+      return ++state_change_counter_;
+    }
+    return -1;
+  }
+
+  int32_t RecordConsumed(uint64_t num_bytes) {
+    std::lock_guard<std::mutex> lg(mutex_);
+    bool was_over = bytes_used_ >= resume_if_below_;
+    bytes_used_ -= num_bytes;
+    if (was_over && bytes_used_ < resume_if_below_) {
+      return ++state_change_counter_;
+    }
+    return -1;
+  }
+
+ private:
+  std::mutex mutex_;
+  uint64_t bytes_used_;
+  int32_t state_change_counter_;
+  const uint64_t resume_if_below_;
+  const uint64_t pause_if_above_;
+};
+
 class SinkNode : public ExecNode {
  public:
   SinkNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
            AsyncGenerator<util::optional<ExecBatch>>* generator,
-           util::BackpressureOptions backpressure)
+           BackpressureOptions backpressure,
+           std::shared_ptr<BackpressureMonitor>* backpressure_monitor_out)
       : ExecNode(plan, std::move(inputs), {"collected"}, {},
                  /*num_outputs=*/0),
-        producer_(MakeProducer(generator, std::move(backpressure))) {}
+        backpressure_queue_(std::make_shared<BackpressureResevoir>(
+            backpressure.resume_if_below, backpressure.pause_if_above)),
+        push_gen_(),
+        producer_(push_gen_.producer()) {
+    if (backpressure_monitor_out) {
+      *backpressure_monitor_out = backpressure_queue_;
+    }
+    AsyncGenerator<util::optional<ExecBatch>> captured_gen = push_gen_;

Review Comment:
   nit, but if we're capturing `this` already, is it necessary to also copy/capture the generator separately?



##########
cpp/src/arrow/compute/exec/sink_node.cc:
##########
@@ -46,31 +46,81 @@ using internal::checked_cast;
 namespace compute {
 namespace {
 
+class BackpressureResevoir : public BackpressureMonitor {
+ public:
+  BackpressureResevoir(uint64_t resume_if_below, uint64_t pause_if_above)
+      : bytes_used_(0),
+        state_change_counter_(0),
+        resume_if_below_(resume_if_below),
+        pause_if_above_(pause_if_above) {}
+
+  uint64_t bytes_in_use() const override { return bytes_used_; }
+  bool is_paused() const override { return state_change_counter_ % 2 == 1; }
+  bool enabled() const { return pause_if_above_ > 0; }
+
+  int32_t RecordProduced(uint64_t num_bytes) {

Review Comment:
   It seems the exact return value is never used, perhaps return bool instead?



##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -129,17 +129,86 @@ class ARROW_EXPORT AggregateNodeOptions : public ExecNodeOptions {
   std::vector<FieldRef> keys;
 };
 
+constexpr int32_t kDefaultBackpressureHighBytes = 1 << 30;  // 1GiB
+constexpr int32_t kDefaultBackpressureLowBytes = 1 << 28;   // 256MiB
+
+class BackpressureMonitor {
+ public:
+  virtual ~BackpressureMonitor() = default;
+  virtual uint64_t bytes_in_use() const = 0;
+  virtual bool is_paused() const = 0;
+};
+
+/// \brief Options to control backpressure behavior
+struct ARROW_EXPORT BackpressureOptions {
+  /// \brief Create default options that perform no backpressure
+  BackpressureOptions() : resume_if_below(0), pause_if_above(0) {}
+  /// \brief Create options that will perform backpressure
+  ///
+  /// \param resume_if_below The producer should resume producing if the backpressure
+  ///                        queue has fewer than resume_if_below items.
+  /// \param pause_if_above The producer should pause producing if the backpressure
+  ///                       queue has more than pause_if_above items
+  BackpressureOptions(uint32_t resume_if_below, uint32_t pause_if_above)
+      : resume_if_below(resume_if_below), pause_if_above(pause_if_above) {}
+
+  static BackpressureOptions DefaultBackpressure() {
+    return BackpressureOptions(kDefaultBackpressureLowBytes,
+                               kDefaultBackpressureHighBytes);
+  }
+
+  inline bool should_apply_backpressure() const { return pause_if_above > 0; }

Review Comment:
   Does this need to be `inline`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org