You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@impala.apache.org by "Sahil Takiar (Code Review)" <ge...@cloudera.org> on 2019/07/24 03:12:35 UTC

[Impala-ASF-CR] IMPALA-8779, IMPALA-8780: RowBatchQueue interface and BufferedPRS imp

Hello Michael Ho, Tim Armstrong, Impala Public Jenkins, 

I'd like you to reexamine a change. Please visit

    http://gerrit.cloudera.org:8080/13883

to look at the new patch set (#7).

Change subject: IMPALA-8779, IMPALA-8780: RowBatchQueue interface and BufferedPRS imp
......................................................................

IMPALA-8779, IMPALA-8780: RowBatchQueue interface and BufferedPRS imp

Introduces a generic RowBatchQueue interface with a blocking and
non-blocking implementation. The blocking implementation is a
re-factored version of the current RowBatchQueue. The non-blocking
implementation is simple wrapper around std::queue. The current
RowBatchQueue, which is used by the scanners, is renamed to
BlockingRowBatchQueue and it is a subclass of the new RowBatchQueue
interface. This patch stops short of completely abstracting all the
details of the current RowBatchQueue and instead includes a few TODOs.
NonBlockingRowBatchQueue has max capacity, after which calls to AddBatch
will return false.

Implements BufferedPlanRootSink using the new RowBatchQueue interface.
Currently, the NonBlockingRowBatchQueue is injected into the
BufferedPlanRootSink, however, the implementation of
BufferedPlanRootSink is not tied to NonBlockingRowBatchQueue, although
it does assume the RowBatchQueue is not thread safe. This allows a
future patch to add a RowBatchQueue backed by a BufferedTupleStream
without re-factoring BufferedPlanRootSink.

BufferedPlanRootSink FlushFinal blocks until the consumer thread has
processed all RowBatches. This ensures that the coordinator fragment
stays alive until all results are fetched, but allows all other
fragments to be shutdown immediately.

Testing:
* Running core tests
* Updated tests/query_test/test_result_spooling.py

Follow up work:
* Add a stress test in test_result_spooling.py to validate the
synchronization logic in BufferedPlanRootSink
* Handle Send calls where num_results < batch->num_rows()
* Add a direct write path in Send that directly writes a RowBatch to a
QueryResultSet, if one is available and if the RowBatchQueue is empty
* Implement a RowBatchQueue backed by a BufferedTupleStream
* Re-factor the resource management logic to release all
non-coordinator fragment resources

Change-Id: I9b1bb4b9c6f6e92c70e8fbee6ccdf48c2f85b7be
---
M be/src/exec/blocking-plan-root-sink.cc
M be/src/exec/blocking-plan-root-sink.h
M be/src/exec/buffered-plan-root-sink.cc
M be/src/exec/buffered-plan-root-sink.h
M be/src/exec/data-sink.cc
M be/src/exec/hdfs-scan-node.cc
M be/src/exec/kudu-scan-node.cc
M be/src/exec/plan-root-sink.cc
M be/src/exec/plan-root-sink.h
M be/src/exec/scan-node.cc
M be/src/exec/scan-node.h
M be/src/exec/scanner-context.cc
M be/src/runtime/CMakeLists.txt
R be/src/runtime/blocking-row-batch-queue.cc
A be/src/runtime/blocking-row-batch-queue.h
A be/src/runtime/non-blocking-row-batch-queue.cc
A be/src/runtime/non-blocking-row-batch-queue.h
M be/src/runtime/row-batch-queue.h
M be/src/util/blocking-queue.h
M tests/query_test/test_result_spooling.py
20 files changed, 464 insertions(+), 97 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/83/13883/7
-- 
To view, visit http://gerrit.cloudera.org:8080/13883
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I9b1bb4b9c6f6e92c70e8fbee6ccdf48c2f85b7be
Gerrit-Change-Number: 13883
Gerrit-PatchSet: 7
Gerrit-Owner: Sahil Takiar <st...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Sahil Takiar <st...@cloudera.com>
Gerrit-Reviewer: Tim Armstrong <ta...@cloudera.com>