You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@impala.apache.org by "Rahul Shivu Mahadev (Code Review)" <ge...@cloudera.org> on 2018/07/13 00:32:18 UTC

[Impala-ASF-CR] IMPALA-3825: [WIP]Distributed RuntimeFiltering Work in progress patch for Distributing runtime filtering aggregation.

Rahul Shivu Mahadev has uploaded this change for review. ( http://gerrit.cloudera.org:8080/10937


Change subject: IMPALA-3825: [WIP]Distributed RuntimeFiltering Work in progress patch for Distributing runtime filtering aggregation.
......................................................................

IMPALA-3825: [WIP]Distributed RuntimeFiltering
Work in progress patch for Distributing runtime filtering
aggregation.

Change-Id: I0289ee39787d8e9c13a16c3d7d64f471bc554536
---
M be/src/common/global-flags.cc
M be/src/runtime/CMakeLists.txt
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/coordinator-filter-state.h
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
A be/src/runtime/filter-state.cc
A be/src/runtime/filter-state.h
M be/src/runtime/query-exec-mgr.cc
M be/src/runtime/query-exec-mgr.h
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter.inline.h
M be/src/service/impala-server.cc
M bin/bootstrap_toolchain.py
M common/thrift/ImpalaInternalService.thrift
18 files changed, 738 insertions(+), 72 deletions(-)



  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/37/10937/1
-- 
To view, visit http://gerrit.cloudera.org:8080/10937
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: I0289ee39787d8e9c13a16c3d7d64f471bc554536
Gerrit-Change-Number: 10937
Gerrit-PatchSet: 1
Gerrit-Owner: Rahul Shivu Mahadev <ra...@cloudera.com>

[Impala-ASF-CR] IMPALA-3825: [WIP]Distributed RuntimeFiltering Work in progress patch for Distributing runtime filtering aggregation.

Posted by "Rahul Shivu Mahadev (Code Review)" <ge...@cloudera.org>.
Rahul Shivu Mahadev has posted comments on this change. ( http://gerrit.cloudera.org:8080/10937 )

Change subject: IMPALA-3825: [WIP]Distributed RuntimeFiltering Work in progress patch for Distributing runtime filtering aggregation.
......................................................................


Patch Set 1:

(9 comments)

Comments addressed in new change https://gerrit.cloudera.org/#/c/11055/

http://gerrit.cloudera.org:8080/#/c/10937/1/be/src/common/global-flags.cc
File be/src/common/global-flags.cc:

http://gerrit.cloudera.org:8080/#/c/10937/1/be/src/common/global-flags.cc@132
PS1, Line 132: DEFINE_bool(enable_distributed_filter_aggregation, true,
             :     "Enables aggregation of filters"
             :     "in a distributed manner, set to false to revert to coordinator based aggregation");
> After looking through the code, it seems better to avoid having this flag. 
Will keep it until testing


http://gerrit.cloudera.org:8080/#/c/10937/1/be/src/runtime/coordinator-backend-state.cc
File be/src/runtime/coordinator-backend-state.cc:

http://gerrit.cloudera.org:8080/#/c/10937/1/be/src/runtime/coordinator-backend-state.cc@93
PS1, Line 93: x.second.ToThrift(&tfs);
> Move inside if(), since it's only needed in that case.
Done


http://gerrit.cloudera.org:8080/#/c/10937/1/be/src/runtime/coordinator-backend-state.cc@94
PS1, Line 94:     AggregatorRoutingTable::const_iterator it = aggregator_routing_table.find(x.first);
            :     if (it != aggregator_routing_table.end()) {
            :       tfs.__set_aggregator_address(it->second);
            :       rpc_params->filter_routing_table.insert(
            :           std::pair<int32_t, TFilterState>(x.first, tfs));
            :     }
> Wouldn't all filters that are present in 'filter_routing_table' be present 
Done


http://gerrit.cloudera.org:8080/#/c/10937/1/be/src/runtime/coordinator-backend-state.cc@101
PS1, Line 101: 
> We also don't need to send the filter routing table to the nodes that don't
Done


http://gerrit.cloudera.org:8080/#/c/10937/1/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

http://gerrit.cloudera.org:8080/#/c/10937/1/be/src/runtime/coordinator.cc@284
PS1, Line 284: 
> We can set the aggregator address in the FilterState here, and not have the
Done


http://gerrit.cloudera.org:8080/#/c/10937/1/be/src/runtime/query-exec-mgr.h
File be/src/runtime/query-exec-mgr.h:

http://gerrit.cloudera.org:8080/#/c/10937/1/be/src/runtime/query-exec-mgr.h@22
PS1, Line 22: #include <memory>
> None of these new #includes should be needed since you haven't added any ne
Done


http://gerrit.cloudera.org:8080/#/c/10937/1/be/src/runtime/query-state.cc
File be/src/runtime/query-state.cc:

http://gerrit.cloudera.org:8080/#/c/10937/1/be/src/runtime/query-state.cc@559
PS1, Line 559: 
> Add a TODO here to call PublishFilterFromAggregator() asynchronously using 
Done


http://gerrit.cloudera.org:8080/#/c/10937/1/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/10937/1/be/src/runtime/runtime-filter-bank.cc@205
PS1, Line 205:       SendFilterToAggregator(
> What if the aggregator and the producer are on the same node? We can consid
Done


http://gerrit.cloudera.org:8080/#/c/10937/1/bin/bootstrap_toolchain.py
File bin/bootstrap_toolchain.py:

http://gerrit.cloudera.org:8080/#/c/10937/1/bin/bootstrap_toolchain.py@430
PS1, Line 430: "flatbuffers", "gcc", "gflags", "glog",
> I'm guessing this was a mistake?
Done



-- 
To view, visit http://gerrit.cloudera.org:8080/10937
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I0289ee39787d8e9c13a16c3d7d64f471bc554536
Gerrit-Change-Number: 10937
Gerrit-PatchSet: 1
Gerrit-Owner: Rahul Shivu Mahadev <ra...@cloudera.com>
Gerrit-Reviewer: Rahul Shivu Mahadev <ra...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sa...@cloudera.com>
Gerrit-Comment-Date: Thu, 26 Jul 2018 23:22:21 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-3825: [WIP]Distributed RuntimeFiltering Work in progress patch for Distributing runtime filtering aggregation.

Posted by "Sailesh Mukil (Code Review)" <ge...@cloudera.org>.
Sailesh Mukil has posted comments on this change. ( http://gerrit.cloudera.org:8080/10937 )

Change subject: IMPALA-3825: [WIP]Distributed RuntimeFiltering Work in progress patch for Distributing runtime filtering aggregation.
......................................................................


Patch Set 1:

(9 comments)

I added some high level comments. After addressing these high level comments, you can clean up the code so we can do a second round of reviews.

http://gerrit.cloudera.org:8080/#/c/10937/1/be/src/common/global-flags.cc
File be/src/common/global-flags.cc:

http://gerrit.cloudera.org:8080/#/c/10937/1/be/src/common/global-flags.cc@132
PS1, Line 132: DEFINE_bool(enable_distributed_filter_aggregation, true,
             :     "Enables aggregation of filters"
             :     "in a distributed manner, set to false to revert to coordinator based aggregation");
After looking through the code, it seems better to avoid having this flag. Let's get rid of coordinator-filter-state.* completely, and go with the new approach you've done. When we need to compare the 2 approaches for benchmarking purposes, we can use 2 branches.


http://gerrit.cloudera.org:8080/#/c/10937/1/be/src/runtime/coordinator-backend-state.cc
File be/src/runtime/coordinator-backend-state.cc:

http://gerrit.cloudera.org:8080/#/c/10937/1/be/src/runtime/coordinator-backend-state.cc@93
PS1, Line 93: x.second.ToThrift(&tfs);
Move inside if(), since it's only needed in that case.


http://gerrit.cloudera.org:8080/#/c/10937/1/be/src/runtime/coordinator-backend-state.cc@94
PS1, Line 94:     AggregatorRoutingTable::const_iterator it = aggregator_routing_table.find(x.first);
            :     if (it != aggregator_routing_table.end()) {
            :       tfs.__set_aggregator_address(it->second);
            :       rpc_params->filter_routing_table.insert(
            :           std::pair<int32_t, TFilterState>(x.first, tfs));
            :     }
Wouldn't all filters that are present in 'filter_routing_table' be present in 'aggregator_routing_table' ?

If that's the case, we're doing the same work multiple times here (once per BackendState), and we would probably be better off not having the 'aggregator_routing_table' at all and just set the aggregator address in 'FilterState' as part of InitFilterRoutingTable().


http://gerrit.cloudera.org:8080/#/c/10937/1/be/src/runtime/coordinator-backend-state.cc@101
PS1, Line 101: 
We also don't need to send the filter routing table to the nodes that don't aggregate or don't produce right? If it's not simple to do, you can leave a TODO for now.


http://gerrit.cloudera.org:8080/#/c/10937/1/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

http://gerrit.cloudera.org:8080/#/c/10937/1/be/src/runtime/coordinator.cc@284
PS1, Line 284: 
We can set the aggregator address in the FilterState here, and not have the aggregator_routing_table.


http://gerrit.cloudera.org:8080/#/c/10937/1/be/src/runtime/query-exec-mgr.h
File be/src/runtime/query-exec-mgr.h:

http://gerrit.cloudera.org:8080/#/c/10937/1/be/src/runtime/query-exec-mgr.h@22
PS1, Line 22: #include <memory>
None of these new #includes should be needed since you haven't added any new code to this file.


http://gerrit.cloudera.org:8080/#/c/10937/1/be/src/runtime/query-state.cc
File be/src/runtime/query-state.cc:

http://gerrit.cloudera.org:8080/#/c/10937/1/be/src/runtime/query-state.cc@559
PS1, Line 559: 
Add a TODO here to call PublishFilterFromAggregator() asynchronously using a thread pool.


http://gerrit.cloudera.org:8080/#/c/10937/1/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/10937/1/be/src/runtime/runtime-filter-bank.cc@205
PS1, Line 205:       SendFilterToAggregator(
What if the aggregator and the producer are on the same node? We can consider short-circuiting that path. Add a TODO for that.


http://gerrit.cloudera.org:8080/#/c/10937/1/bin/bootstrap_toolchain.py
File bin/bootstrap_toolchain.py:

http://gerrit.cloudera.org:8080/#/c/10937/1/bin/bootstrap_toolchain.py@430
PS1, Line 430: "flatbuffers", "gcc", "gflags", "glog",
I'm guessing this was a mistake?



-- 
To view, visit http://gerrit.cloudera.org:8080/10937
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I0289ee39787d8e9c13a16c3d7d64f471bc554536
Gerrit-Change-Number: 10937
Gerrit-PatchSet: 1
Gerrit-Owner: Rahul Shivu Mahadev <ra...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sa...@cloudera.com>
Gerrit-Comment-Date: Thu, 19 Jul 2018 16:42:32 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-3825: [WIP]Distributed RuntimeFiltering Work in progress patch for Distributing runtime filtering aggregation.

Posted by "Tim Armstrong (Code Review)" <ge...@cloudera.org>.
Tim Armstrong has abandoned this change. ( http://gerrit.cloudera.org:8080/10937 )

Change subject: IMPALA-3825: [WIP]Distributed RuntimeFiltering Work in progress patch for Distributing runtime filtering aggregation.
......................................................................


Abandoned

https://gerrit.cloudera.org/#/c/11055/
-- 
To view, visit http://gerrit.cloudera.org:8080/10937
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: abandon
Gerrit-Change-Id: I0289ee39787d8e9c13a16c3d7d64f471bc554536
Gerrit-Change-Number: 10937
Gerrit-PatchSet: 1
Gerrit-Owner: Rahul Shivu Mahadev <ra...@cloudera.com>
Gerrit-Reviewer: Rahul Shivu Mahadev <ra...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sa...@apache.org>