You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ar...@apache.org on 2019/05/29 23:28:23 UTC

[impala] 01/04: cleanup: extract RowBatchQueue into its own file

This is an automated email from the ASF dual-hosted git repository.

arodoni pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 537a4646dc1bf0a204354f23ed0905674c71eae5
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Thu Jul 12 23:10:38 2018 -0700

    cleanup: extract RowBatchQueue into its own file
    
    While looking at IMPALA-7096, I noticed that RowBatchQueue was
    implemented in a strange place.
    
    Change-Id: I3577c1c6920b8cf858c8d49f8812ccc305d833f6
    Reviewed-on: http://gerrit.cloudera.org:8080/10943
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/blocking-join-node.h         |  1 +
 be/src/exec/exec-node.cc                 | 31 ---------------
 be/src/exec/exec-node.h                  | 35 -----------------
 be/src/exec/hdfs-scan-node.cc            |  1 +
 be/src/exec/kudu-scan-node.cc            |  1 +
 be/src/exec/scan-node.cc                 |  1 +
 be/src/exec/scan-node.h                  |  1 +
 be/src/exec/scanner-context.cc           |  1 +
 be/src/runtime/CMakeLists.txt            |  1 +
 be/src/runtime/data-stream-recvr.cc      |  1 +
 be/src/runtime/krpc-data-stream-recvr.cc |  1 +
 be/src/runtime/row-batch-queue.cc        | 55 +++++++++++++++++++++++++++
 be/src/runtime/row-batch-queue.h         | 65 ++++++++++++++++++++++++++++++++
 13 files changed, 129 insertions(+), 66 deletions(-)

diff --git a/be/src/exec/blocking-join-node.h b/be/src/exec/blocking-join-node.h
index b7dd79a..8198ad0 100644
--- a/be/src/exec/blocking-join-node.h
+++ b/be/src/exec/blocking-join-node.h
@@ -25,6 +25,7 @@
 
 #include "exec/exec-node.h"
 #include "util/promise.h"
+#include "util/stopwatch.h"
 
 #include "gen-cpp/PlanNodes_types.h"  // for TJoinOp
 
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index a44a5c1..5dca184 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -80,37 +80,6 @@ int ExecNode::GetNodeIdFromProfile(RuntimeProfile* p) {
   return p->metadata();
 }
 
-ExecNode::RowBatchQueue::RowBatchQueue(int max_batches)
-  : BlockingQueue<unique_ptr<RowBatch>>(max_batches) {
-}
-
-ExecNode::RowBatchQueue::~RowBatchQueue() {
-  DCHECK(cleanup_queue_.empty());
-}
-
-void ExecNode::RowBatchQueue::AddBatch(unique_ptr<RowBatch> batch) {
-  if (!BlockingPut(move(batch))) {
-    lock_guard<SpinLock> l(lock_);
-    cleanup_queue_.push_back(move(batch));
-  }
-}
-
-unique_ptr<RowBatch> ExecNode::RowBatchQueue::GetBatch() {
-  unique_ptr<RowBatch> result;
-  if (BlockingGet(&result)) return result;
-  return unique_ptr<RowBatch>();
-}
-
-void ExecNode::RowBatchQueue::Cleanup() {
-  unique_ptr<RowBatch> batch = NULL;
-  while ((batch = GetBatch()) != NULL) {
-    batch.reset();
-  }
-
-  lock_guard<SpinLock> l(lock_);
-  cleanup_queue_.clear();
-}
-
 ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
   : id_(tnode.node_id),
     type_(tnode.node_type),
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index 9a87a56..a62ed6c 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -30,7 +30,6 @@
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/descriptors.h" // for RowDescriptor
 #include "runtime/reservation-manager.h"
-#include "util/blocking-queue.h"
 #include "util/runtime-profile.h"
 
 namespace impala {
@@ -243,40 +242,6 @@ class ExecNode {
     return reservation_manager_.ReleaseUnusedReservation();
   }
 
-  /// Extends blocking queue for row batches. Row batches have a property that
-  /// they must be processed in the order they were produced, even in cancellation
-  /// paths. Preceding row batches can contain ptrs to memory in subsequent row batches
-  /// and we need to make sure those ptrs stay valid.
-  /// Row batches that are added after Shutdown() are queued in another queue, which can
-  /// be cleaned up during Close().
-  /// All functions are thread safe.
-  class RowBatchQueue : public BlockingQueue<std::unique_ptr<RowBatch>> {
-   public:
-    /// max_batches is the maximum number of row batches that can be queued.
-    /// When the queue is full, producers will block.
-    RowBatchQueue(int max_batches);
-    ~RowBatchQueue();
-
-    /// Adds a batch to the queue. This is blocking if the queue is full.
-    void AddBatch(std::unique_ptr<RowBatch> batch);
-
-    /// Gets a row batch from the queue. Returns NULL if there are no more.
-    /// This function blocks.
-    /// Returns NULL after Shutdown().
-    std::unique_ptr<RowBatch> GetBatch();
-
-    /// Deletes all row batches in cleanup_queue_. Not valid to call AddBatch()
-    /// after this is called.
-    void Cleanup();
-
-   private:
-    /// Lock protecting cleanup_queue_
-    SpinLock lock_;
-
-    /// Queue of orphaned row batches
-    std::list<std::unique_ptr<RowBatch>> cleanup_queue_;
-  };
-
   /// Unique within a single plan tree.
   int id_;
   TPlanNodeType::type type_;
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 1ceaf2c..5d4f9b0 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -31,6 +31,7 @@
 #include "runtime/runtime-state.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
+#include "runtime/row-batch-queue.h"
 #include "runtime/thread-resource-mgr.h"
 #include "util/debug-util.h"
 #include "util/disk-info.h"
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index 48816f9..30194f9 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -27,6 +27,7 @@
 #include "runtime/mem-pool.h"
 #include "runtime/runtime-state.h"
 #include "runtime/row-batch.h"
+#include "runtime/row-batch-queue.h"
 #include "runtime/thread-resource-mgr.h"
 #include "runtime/tuple-row.h"
 #include "util/runtime-profile-counters.h"
diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc
index 01aa269..4d59eed 100644
--- a/be/src/exec/scan-node.cc
+++ b/be/src/exec/scan-node.cc
@@ -22,6 +22,7 @@
 #include "exprs/scalar-expr.h"
 #include "runtime/io/disk-io-mgr.h"
 #include "runtime/row-batch.h"
+#include "runtime/row-batch-queue.h"
 #include "runtime/runtime-filter.inline.h"
 #include "runtime/runtime-state.h"
 #include "util/disk-info.h"
diff --git a/be/src/exec/scan-node.h b/be/src/exec/scan-node.h
index 63bb59b..1d0728c 100644
--- a/be/src/exec/scan-node.h
+++ b/be/src/exec/scan-node.h
@@ -28,6 +28,7 @@
 
 namespace impala {
 
+class RowBatchQueue;
 class TScanRange;
 
 /// Abstract base class of all scan nodes. Subclasses support different storage layers
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index 75aacee..a9fad6a 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -26,6 +26,7 @@
 #include "runtime/exec-env.h"
 #include "runtime/mem-pool.h"
 #include "runtime/row-batch.h"
+#include "runtime/row-batch-queue.h"
 #include "runtime/runtime-state.h"
 #include "runtime/string-buffer.h"
 #include "util/debug-util.h"
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 2fea5bd..e09b27c 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -61,6 +61,7 @@ add_library(Runtime
   reservation-manager.cc
   row-batch.cc
   ${ROW_BATCH_PROTO_SRCS}
+  row-batch-queue.cc
   runtime-filter.cc
   runtime-filter-bank.cc
   runtime-filter-ir.cc
diff --git a/be/src/runtime/data-stream-recvr.cc b/be/src/runtime/data-stream-recvr.cc
index 8d9047f..c9a9ab9 100644
--- a/be/src/runtime/data-stream-recvr.cc
+++ b/be/src/runtime/data-stream-recvr.cc
@@ -22,6 +22,7 @@
 #include "runtime/data-stream-mgr.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
+#include "runtime/row-batch-queue.h"
 #include "runtime/sorted-run-merger.h"
 #include "util/condition-variable.h"
 #include "util/runtime-profile-counters.h"
diff --git a/be/src/runtime/krpc-data-stream-recvr.cc b/be/src/runtime/krpc-data-stream-recvr.cc
index be51f32..3933e02 100644
--- a/be/src/runtime/krpc-data-stream-recvr.cc
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -31,6 +31,7 @@
 #include "runtime/krpc-data-stream-mgr.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
+#include "runtime/row-batch-queue.h"
 #include "runtime/sorted-run-merger.h"
 #include "service/data-stream-service.h"
 #include "util/runtime-profile-counters.h"
diff --git a/be/src/runtime/row-batch-queue.cc b/be/src/runtime/row-batch-queue.cc
new file mode 100644
index 0000000..1fd5555
--- /dev/null
+++ b/be/src/runtime/row-batch-queue.cc
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/row-batch-queue.h"
+
+#include "runtime/row-batch.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+RowBatchQueue::RowBatchQueue(int max_batches)
+  : BlockingQueue<unique_ptr<RowBatch>>(max_batches) {}
+
+RowBatchQueue::~RowBatchQueue() {
+  DCHECK(cleanup_queue_.empty());
+}
+
+void RowBatchQueue::AddBatch(unique_ptr<RowBatch> batch) {
+  if (!BlockingPut(move(batch))) {
+    lock_guard<SpinLock> l(lock_);
+    cleanup_queue_.push_back(move(batch));
+  }
+}
+
+unique_ptr<RowBatch> RowBatchQueue::GetBatch() {
+  unique_ptr<RowBatch> result;
+  if (BlockingGet(&result)) return result;
+  return unique_ptr<RowBatch>();
+}
+
+void RowBatchQueue::Cleanup() {
+  unique_ptr<RowBatch> batch = nullptr;
+  while ((batch = GetBatch()) != nullptr) {
+    batch.reset();
+  }
+
+  lock_guard<SpinLock> l(lock_);
+  cleanup_queue_.clear();
+}
+}
diff --git a/be/src/runtime/row-batch-queue.h b/be/src/runtime/row-batch-queue.h
new file mode 100644
index 0000000..bd2f551
--- /dev/null
+++ b/be/src/runtime/row-batch-queue.h
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_BLOCKING_QUEUE_H
+#define IMPALA_RUNTIME_BLOCKING_QUEUE_H
+
+#include <list>
+#include <memory>
+
+#include "util/blocking-queue.h"
+#include "util/spinlock.h"
+
+namespace impala {
+
+class RowBatch;
+
+/// Extends blocking queue for row batches. Row batches have a property that
+/// they must be processed in the order they were produced, even in cancellation
+/// paths. Preceding row batches can contain ptrs to memory in subsequent row batches
+/// and we need to make sure those ptrs stay valid.
+/// Row batches that are added after Shutdown() are queued in a separate "cleanup"
+/// queue, which can be cleaned up during Close().
+/// All functions are thread safe.
+class RowBatchQueue : public BlockingQueue<std::unique_ptr<RowBatch>> {
+ public:
+  /// max_batches is the maximum number of row batches that can be queued.
+  /// When the queue is full, producers will block.
+  RowBatchQueue(int max_batches);
+  ~RowBatchQueue();
+
+  /// Adds a batch to the queue. This is blocking if the queue is full.
+  void AddBatch(std::unique_ptr<RowBatch> batch);
+
+  /// Gets a row batch from the queue. Returns NULL if there are no more.
+  /// This function blocks.
+  /// Returns NULL after Shutdown().
+  std::unique_ptr<RowBatch> GetBatch();
+
+  /// Deletes all row batches in cleanup_queue_. Not valid to call AddBatch()
+  /// after this is called.
+  void Cleanup();
+
+ private:
+  /// Lock protecting cleanup_queue_
+  SpinLock lock_;
+
+  /// Queue of orphaned row batches
+  std::list<std::unique_ptr<RowBatch>> cleanup_queue_;
+};
+}
+#endif