You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ar...@apache.org on 2019/05/29 23:28:23 UTC
[impala] 01/04: cleanup: extract RowBatchQueue into its own file
This is an automated email from the ASF dual-hosted git repository.
arodoni pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 537a4646dc1bf0a204354f23ed0905674c71eae5
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Thu Jul 12 23:10:38 2018 -0700
cleanup: extract RowBatchQueue into its own file
While looking at IMPALA-7096, I noticed that RowBatchQueue was
implemented in a strange place.
Change-Id: I3577c1c6920b8cf858c8d49f8812ccc305d833f6
Reviewed-on: http://gerrit.cloudera.org:8080/10943
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/exec/blocking-join-node.h | 1 +
be/src/exec/exec-node.cc | 31 ---------------
be/src/exec/exec-node.h | 35 -----------------
be/src/exec/hdfs-scan-node.cc | 1 +
be/src/exec/kudu-scan-node.cc | 1 +
be/src/exec/scan-node.cc | 1 +
be/src/exec/scan-node.h | 1 +
be/src/exec/scanner-context.cc | 1 +
be/src/runtime/CMakeLists.txt | 1 +
be/src/runtime/data-stream-recvr.cc | 1 +
be/src/runtime/krpc-data-stream-recvr.cc | 1 +
be/src/runtime/row-batch-queue.cc | 55 +++++++++++++++++++++++++++
be/src/runtime/row-batch-queue.h | 65 ++++++++++++++++++++++++++++++++
13 files changed, 129 insertions(+), 66 deletions(-)
diff --git a/be/src/exec/blocking-join-node.h b/be/src/exec/blocking-join-node.h
index b7dd79a..8198ad0 100644
--- a/be/src/exec/blocking-join-node.h
+++ b/be/src/exec/blocking-join-node.h
@@ -25,6 +25,7 @@
#include "exec/exec-node.h"
#include "util/promise.h"
+#include "util/stopwatch.h"
#include "gen-cpp/PlanNodes_types.h" // for TJoinOp
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index a44a5c1..5dca184 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -80,37 +80,6 @@ int ExecNode::GetNodeIdFromProfile(RuntimeProfile* p) {
return p->metadata();
}
-ExecNode::RowBatchQueue::RowBatchQueue(int max_batches)
- : BlockingQueue<unique_ptr<RowBatch>>(max_batches) {
-}
-
-ExecNode::RowBatchQueue::~RowBatchQueue() {
- DCHECK(cleanup_queue_.empty());
-}
-
-void ExecNode::RowBatchQueue::AddBatch(unique_ptr<RowBatch> batch) {
- if (!BlockingPut(move(batch))) {
- lock_guard<SpinLock> l(lock_);
- cleanup_queue_.push_back(move(batch));
- }
-}
-
-unique_ptr<RowBatch> ExecNode::RowBatchQueue::GetBatch() {
- unique_ptr<RowBatch> result;
- if (BlockingGet(&result)) return result;
- return unique_ptr<RowBatch>();
-}
-
-void ExecNode::RowBatchQueue::Cleanup() {
- unique_ptr<RowBatch> batch = NULL;
- while ((batch = GetBatch()) != NULL) {
- batch.reset();
- }
-
- lock_guard<SpinLock> l(lock_);
- cleanup_queue_.clear();
-}
-
ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
: id_(tnode.node_id),
type_(tnode.node_type),
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index 9a87a56..a62ed6c 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -30,7 +30,6 @@
#include "runtime/bufferpool/reservation-tracker.h"
#include "runtime/descriptors.h" // for RowDescriptor
#include "runtime/reservation-manager.h"
-#include "util/blocking-queue.h"
#include "util/runtime-profile.h"
namespace impala {
@@ -243,40 +242,6 @@ class ExecNode {
return reservation_manager_.ReleaseUnusedReservation();
}
- /// Extends blocking queue for row batches. Row batches have a property that
- /// they must be processed in the order they were produced, even in cancellation
- /// paths. Preceding row batches can contain ptrs to memory in subsequent row batches
- /// and we need to make sure those ptrs stay valid.
- /// Row batches that are added after Shutdown() are queued in another queue, which can
- /// be cleaned up during Close().
- /// All functions are thread safe.
- class RowBatchQueue : public BlockingQueue<std::unique_ptr<RowBatch>> {
- public:
- /// max_batches is the maximum number of row batches that can be queued.
- /// When the queue is full, producers will block.
- RowBatchQueue(int max_batches);
- ~RowBatchQueue();
-
- /// Adds a batch to the queue. This is blocking if the queue is full.
- void AddBatch(std::unique_ptr<RowBatch> batch);
-
- /// Gets a row batch from the queue. Returns NULL if there are no more.
- /// This function blocks.
- /// Returns NULL after Shutdown().
- std::unique_ptr<RowBatch> GetBatch();
-
- /// Deletes all row batches in cleanup_queue_. Not valid to call AddBatch()
- /// after this is called.
- void Cleanup();
-
- private:
- /// Lock protecting cleanup_queue_
- SpinLock lock_;
-
- /// Queue of orphaned row batches
- std::list<std::unique_ptr<RowBatch>> cleanup_queue_;
- };
-
/// Unique within a single plan tree.
int id_;
TPlanNodeType::type type_;
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 1ceaf2c..5d4f9b0 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -31,6 +31,7 @@
#include "runtime/runtime-state.h"
#include "runtime/mem-tracker.h"
#include "runtime/row-batch.h"
+#include "runtime/row-batch-queue.h"
#include "runtime/thread-resource-mgr.h"
#include "util/debug-util.h"
#include "util/disk-info.h"
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index 48816f9..30194f9 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -27,6 +27,7 @@
#include "runtime/mem-pool.h"
#include "runtime/runtime-state.h"
#include "runtime/row-batch.h"
+#include "runtime/row-batch-queue.h"
#include "runtime/thread-resource-mgr.h"
#include "runtime/tuple-row.h"
#include "util/runtime-profile-counters.h"
diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc
index 01aa269..4d59eed 100644
--- a/be/src/exec/scan-node.cc
+++ b/be/src/exec/scan-node.cc
@@ -22,6 +22,7 @@
#include "exprs/scalar-expr.h"
#include "runtime/io/disk-io-mgr.h"
#include "runtime/row-batch.h"
+#include "runtime/row-batch-queue.h"
#include "runtime/runtime-filter.inline.h"
#include "runtime/runtime-state.h"
#include "util/disk-info.h"
diff --git a/be/src/exec/scan-node.h b/be/src/exec/scan-node.h
index 63bb59b..1d0728c 100644
--- a/be/src/exec/scan-node.h
+++ b/be/src/exec/scan-node.h
@@ -28,6 +28,7 @@
namespace impala {
+class RowBatchQueue;
class TScanRange;
/// Abstract base class of all scan nodes. Subclasses support different storage layers
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index 75aacee..a9fad6a 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -26,6 +26,7 @@
#include "runtime/exec-env.h"
#include "runtime/mem-pool.h"
#include "runtime/row-batch.h"
+#include "runtime/row-batch-queue.h"
#include "runtime/runtime-state.h"
#include "runtime/string-buffer.h"
#include "util/debug-util.h"
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 2fea5bd..e09b27c 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -61,6 +61,7 @@ add_library(Runtime
reservation-manager.cc
row-batch.cc
${ROW_BATCH_PROTO_SRCS}
+ row-batch-queue.cc
runtime-filter.cc
runtime-filter-bank.cc
runtime-filter-ir.cc
diff --git a/be/src/runtime/data-stream-recvr.cc b/be/src/runtime/data-stream-recvr.cc
index 8d9047f..c9a9ab9 100644
--- a/be/src/runtime/data-stream-recvr.cc
+++ b/be/src/runtime/data-stream-recvr.cc
@@ -22,6 +22,7 @@
#include "runtime/data-stream-mgr.h"
#include "runtime/mem-tracker.h"
#include "runtime/row-batch.h"
+#include "runtime/row-batch-queue.h"
#include "runtime/sorted-run-merger.h"
#include "util/condition-variable.h"
#include "util/runtime-profile-counters.h"
diff --git a/be/src/runtime/krpc-data-stream-recvr.cc b/be/src/runtime/krpc-data-stream-recvr.cc
index be51f32..3933e02 100644
--- a/be/src/runtime/krpc-data-stream-recvr.cc
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -31,6 +31,7 @@
#include "runtime/krpc-data-stream-mgr.h"
#include "runtime/mem-tracker.h"
#include "runtime/row-batch.h"
+#include "runtime/row-batch-queue.h"
#include "runtime/sorted-run-merger.h"
#include "service/data-stream-service.h"
#include "util/runtime-profile-counters.h"
diff --git a/be/src/runtime/row-batch-queue.cc b/be/src/runtime/row-batch-queue.cc
new file mode 100644
index 0000000..1fd5555
--- /dev/null
+++ b/be/src/runtime/row-batch-queue.cc
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/row-batch-queue.h"
+
+#include "runtime/row-batch.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+RowBatchQueue::RowBatchQueue(int max_batches)
+ : BlockingQueue<unique_ptr<RowBatch>>(max_batches) {}
+
+RowBatchQueue::~RowBatchQueue() {
+ DCHECK(cleanup_queue_.empty());
+}
+
+void RowBatchQueue::AddBatch(unique_ptr<RowBatch> batch) {
+ if (!BlockingPut(move(batch))) {
+ lock_guard<SpinLock> l(lock_);
+ cleanup_queue_.push_back(move(batch));
+ }
+}
+
+unique_ptr<RowBatch> RowBatchQueue::GetBatch() {
+ unique_ptr<RowBatch> result;
+ if (BlockingGet(&result)) return result;
+ return unique_ptr<RowBatch>();
+}
+
+void RowBatchQueue::Cleanup() {
+ unique_ptr<RowBatch> batch = nullptr;
+ while ((batch = GetBatch()) != nullptr) {
+ batch.reset();
+ }
+
+ lock_guard<SpinLock> l(lock_);
+ cleanup_queue_.clear();
+}
+}
diff --git a/be/src/runtime/row-batch-queue.h b/be/src/runtime/row-batch-queue.h
new file mode 100644
index 0000000..bd2f551
--- /dev/null
+++ b/be/src/runtime/row-batch-queue.h
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_BLOCKING_QUEUE_H
+#define IMPALA_RUNTIME_BLOCKING_QUEUE_H
+
+#include <list>
+#include <memory>
+
+#include "util/blocking-queue.h"
+#include "util/spinlock.h"
+
+namespace impala {
+
+class RowBatch;
+
+/// Extends blocking queue for row batches. Row batches have a property that
+/// they must be processed in the order they were produced, even in cancellation
+/// paths. Preceding row batches can contain ptrs to memory in subsequent row batches
+/// and we need to make sure those ptrs stay valid.
+/// Row batches that are added after Shutdown() are queued in a separate "cleanup"
+/// queue, which can be cleaned up during Close().
+/// All functions are thread safe.
+class RowBatchQueue : public BlockingQueue<std::unique_ptr<RowBatch>> {
+ public:
+ /// max_batches is the maximum number of row batches that can be queued.
+ /// When the queue is full, producers will block.
+ RowBatchQueue(int max_batches);
+ ~RowBatchQueue();
+
+ /// Adds a batch to the queue. This is blocking if the queue is full.
+ void AddBatch(std::unique_ptr<RowBatch> batch);
+
+ /// Gets a row batch from the queue. Returns NULL if there are no more.
+ /// This function blocks.
+ /// Returns NULL after Shutdown().
+ std::unique_ptr<RowBatch> GetBatch();
+
+ /// Deletes all row batches in cleanup_queue_. Not valid to call AddBatch()
+ /// after this is called.
+ void Cleanup();
+
+ private:
+ /// Lock protecting cleanup_queue_
+ SpinLock lock_;
+
+ /// Queue of orphaned row batches
+ std::list<std::unique_ptr<RowBatch>> cleanup_queue_;
+};
+}
+#endif