You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@impala.apache.org by "Fang-Yu Rao (Code Review)" <ge...@cloudera.org> on 2019/07/19 06:27:42 UTC

[Impala-ASF-CR] IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC

Fang-Yu Rao has uploaded this change for review. ( http://gerrit.cloudera.org:8080/13882


Change subject: IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC
......................................................................

IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC

Port UpdateFilter() and PublishFilter() to KRPC w/o sidecar.

Testing:
1. Have tested this patch set using those EE cases in test_runtime_filters.py
on my local dev box.
2. Have tested this patch set using those BE tests in bloom-filter-test.cc
on my local dev box.

TODO:
Incorporate sidecar.

Change-Id: I6b394796d250286510e157ae326882bfc01d387a
---
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/rpc/thrift-server-test.cc
M be/src/runtime/backend-client.h
M be/src/runtime/client-cache.cc
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/coordinator-filter-state.h
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/decimal-value.h
M be/src/runtime/decimal-value.inline.h
M be/src/runtime/exec-env.cc
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/timestamp-value.h
M be/src/scheduling/request-pool-service.h
M be/src/service/client-request-state.cc
M be/src/service/client-request-state.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/frontend.h
M be/src/service/impala-internal-service.cc
M be/src/service/impala-internal-service.h
M be/src/service/impala-server.cc
M be/src/service/impala-server.h
M be/src/util/bloom-filter-test.cc
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/min-max-filter-test.cc
M be/src/util/min-max-filter.cc
M be/src/util/min-max-filter.h
M common/protobuf/control_service.proto
M common/protobuf/data_stream_service.proto
M common/thrift/ImpalaInternalService.thrift
39 files changed, 770 insertions(+), 684 deletions(-)



  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/82/13882/2
-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 2
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Thomas Tauber-Marshall (Code Review)" <ge...@cloudera.org>.
Thomas Tauber-Marshall has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 19:

(26 comments)

http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/rpc/thrift-server-test.cc
File be/src/rpc/thrift-server-test.cc:

http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/rpc/thrift-server-test.cc@22
PS19, Line 22: #include "gen-cpp/ImpalaInternalService.h"
Why is this necessary?


http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/coordinator-backend-state.cc
File be/src/runtime/coordinator-backend-state.cc:

http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/coordinator-backend-state.cc@572
PS19, Line 572: rpc_status.message().ToString()
I think you can just call ToString() directly on the kudu::Status without calling .message()


http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/coordinator-backend-state.cc@573
PS19, Line 573: controller.Reset();
Not needed, since we don't reuse the controller


http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/coordinator-backend-state.cc@576
PS19, Line 576: res.status().error_msgs(0)
We'll want to print the status code and other error messages. Easiest way is to just construct a regular Status object from the StatusPB, i.e. replace this with something like "Status(res.status()).GetDetail()"

That's of course less efficient, but this shouldn't happen very often


http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/coordinator.h
File be/src/runtime/coordinator.h:

http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/coordinator.h@44
PS19, Line 44: class RpcController;
not used


http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/coordinator.cc@1070
PS19, Line 1070: if (rpc_params.has_bloom_filter()) {
               :         if (!rpc_params.bloom_filter().always_false()
               :             && !rpc_params.bloom_filter().always_true()) {
combine these with '&&'


http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/coordinator.cc@1107
PS19, Line 1107: } else if (!params->bloom_filter().always_false()) {
probably more straightforward to flip the conditions, i.e. put the always_false() block here and put this block in the 'else'


http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/coordinator.cc@1118
PS19, Line 1118: return;
I don't think we want to just return here - we still need to hit the "if (pending_count_ == 0..." below


http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/coordinator.cc@1122
PS19, Line 1122: // There are two cases in which 'bloom_filter_.always_false()'
               :         // evaluates to true: (1) 'bloom_filter_' has never been initialized
               :         // by any incoming Bloom filter previously, (2) 'bloom_filter_' has
               :         // been initialized to an always false Bloom filter by an incoming
               :         // Bloom filter previously. We note that
               :         // 'bloom_filter_.has_log_bufferpool_space()' evaluates to false when
               :         // 'bloom_filter_' has never been initialized by any incoming Bloom
               :         // filter and to true otherwise.
               :         // In either case, we will always perform the swap operation.
I think this whole comment is unnecessary


http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/coordinator.cc@1141
PS19, Line 1141: // We want to move the payload from the request rather than copy it and
               :           // take double the memory cost.
this can be removed


http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/coordinator.cc@1157
PS19, Line 1157: non_const_filter = &const_cast<BloomFilterPB&>(params->bloom_filter());
might as well put this is in the 'if' for clarity


http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/coordinator.cc@1158
PS19, Line 1158: // Even if the current incoming Bloom filter is an always false filter,
               :       // we still need to perform the swap operation to properly initialize
               :       // the field of 'log_bufferpool_space_' if 'bloom_filter_' has not been
               :       // initialized by any incoming Bloom filter yet. Otherwise we will hit a
               :       // DCHECK error in 'buffer-allocator.cc' that requires that the buffer
               :       // length of the directory associated with the Bloom filter to be
               :       // published is greater than some lower bound when attempting to call
               :       // BloomFilter::Init() in RuntimeFilterBank::PublishGlobalFilter().
Unnecessarily long comment. Lets just put a comment inside the 'if' that says something like 'bloom_filter_ hasn't been initialized yet, so do so now'


http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/coordinator.cc@1167
PS19, Line 1167: // We want to move the payload from the request rather than copy it and
               :         // take double the memory cost.
This can be removed. It was referring to swapping the directory to avoid a large copy, but the directory isn't being swapped here.


http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/data-stream-test.cc
File be/src/runtime/data-stream-test.cc:

http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/data-stream-test.cc@127
PS19, Line 127: 
nit: extra whitespace. You can just remove all of it and do '{}'


http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/fragment-instance-state.h
File be/src/runtime/fragment-instance-state.h:

http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/fragment-instance-state.h@48
PS19, Line 48: using kudu::rpc::RpcContext;
don't use 'using' in headers


http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/krpc-data-stream-recvr.cc
File be/src/runtime/krpc-data-stream-recvr.cc:

http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/krpc-data-stream-recvr.cc@469
PS19, Line 469: LOG(INFO) << "KrpcDataStreamRecvr::SenderQueue::AddBatch(), "
              :           "rpc_context->GetTransferSize(): " << rpc_context->GetTransferSize();
I'm guessing you did this for debugging? It should be removed


http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/krpc-data-stream-recvr.cc@569
PS19, Line 569:     LOG(INFO) << "KrpcDataStreamRecvr::SenderQueue::TakeOverEarlySender, "
same here


http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/query-state.h
File be/src/runtime/query-state.h:

http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/query-state.h@45
PS19, Line 45: using kudu::rpc::RpcContext;
don't use using in header files


http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/runtime-filter-bank.h
File be/src/runtime/runtime-filter-bank.h:

http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/runtime-filter-bank.h@156
PS19, Line 156: num_inflight_rpcs_
nit: when referring to variables in comments, we put single quotes around them


http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/runtime-filter-bank.cc@121
PS19, Line 121: DCHECK_EQ(res->status().status_code(), TErrorCode::OK);
Brief comment about why this is correct, i.e. "UpdateFilter should never set an error status"


http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/service/data-stream-service.h
File be/src/service/data-stream-service.h:

http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/service/data-stream-service.h@23
PS19, Line 23: //#include "kudu/rpc/rpc_controller.h"
remove


http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/service/data-stream-service.h@71
PS19, Line 71: ::
nit: remove this, here and elsewhere, to be consistent with the rest of this file


http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/service/data-stream-service.cc
File be/src/service/data-stream-service.cc:

http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/service/data-stream-service.cc@137
PS19, Line 137: LOG(ERROR)
I think this isn't necessarily an error, eg. it can happen if the query was cancelled, so lets do LOG(INFO)


http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/service/impala-server.h
File be/src/service/impala-server.h:

http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/service/impala-server.h@60
PS19, Line 60: using kudu::rpc::RpcContext;
don't use 'using' in header files


http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/util/bloom-filter.h
File be/src/util/bloom-filter.h:

http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/util/bloom-filter.h@43
PS19, Line 43: class Coordinator;
not used


http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/util/min-max-filter.h
File be/src/util/min-max-filter.h:

http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/util/min-max-filter.h@22
PS19, Line 22: #include "gen-cpp/control_service.pb.h"
shouldn't this be 'data_stream_service'?



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 19
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Tue, 17 Sep 2019 21:50:43 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC

Posted by "Michael Ho (Code Review)" <ge...@cloudera.org>.
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC
......................................................................


Patch Set 12:

A quick high level reminder about using sidecar for the filter:

- the RPC framework assumes the lifetime of the binary buffer referenced by the sidecar is valid till the end of an RPC. If we used to copy the entire filter as a string into the Thrift data structure, it's okay for the filter to disappear / freed before the RPC completes. However, with RPC sidecar, we need to keep the filter alive until the RPC is done.

- the KRPC also implements the functionality to cancel an asynchronous in-flight RPC. Not that it's necessarily needed in this patch but that functionality allows the RPC to end early so that the binary blob referred to by sidecar can be freed once the completion callback is invoked.


-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 12
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Mon, 05 Aug 2019 18:18:59 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 31: Verified+1


-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 31
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Sat, 09 Nov 2019 01:54:50 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC

Posted by "Fang-Yu Rao (Code Review)" <ge...@cloudera.org>.
Fang-Yu Rao has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC
......................................................................


Patch Set 11:

(21 comments)

I have addressed most of Thomas' previous comments. The current patch set passed the exhaustive private parameterized test as well. I will be working on incorporating sidecar now. Thanks.

http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.h
File be/src/runtime/runtime-filter-bank.h:

http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.h@97
PS3, Line 97:   /// Makes a bloom_filter (aggregated globally from all producer fragments) available for
> remove this
Done


http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/runtime/runtime-filter-bank.cc@224
PS2, Line 224: 
> line too long (93 > 90)
Done


http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/service/impala-internal-service.h
File be/src/service/impala-internal-service.h:

http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/service/impala-internal-service.h@1
PS2, Line 1: // Licensed to the Apache Software Foundation (ASF) under one
> This whole file can be removed
This file is kept in this patch set but will be removed in a follow up patch according to our discussion.


http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/service/impala-internal-service.cc
File be/src/service/impala-internal-service.cc:

http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/service/impala-internal-service.cc@1
PS2, Line 1: // Licensed to the Apache Software Foundation (ASF) under one
> This whole file can be removed
This file is kept in this patch set but will be removed in a follow up patch according to our discussion.


http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/service/impala-server.h
File be/src/service/impala-server.h:

http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/service/impala-server.h@601
PS2, Line 601:     /// the frontend.
> Done
Done


http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/util/bloom-filter-test.cc
File be/src/util/bloom-filter-test.cc:

http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/util/bloom-filter-test.cc@31
PS2, Line 31: #include "gen-cpp/data_stream_service.pb.h"
> This can be moved up with the includes above it
Done


http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/util/bloom-filter.cc
File be/src/util/bloom-filter.cc:

http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/util/bloom-filter.cc@208
PS2, Line 208: _directory()))[0]));
> I don't think that you can use string::data() like this, see: http://www.cp
Thanks for pointing this out! I have modified this expression according to your suggestion.


http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/util/min-max-filter-test.cc
File be/src/util/min-max-filter-test.cc:

http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/util/min-max-filter-test.cc@28
PS2, Line 28: 
> This can be put with the block of includes right above it
Done


http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/util/min-max-filter.cc
File be/src/util/min-max-filter.cc:

http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/util/min-max-filter.cc@378
PS2, Line 378:     if (in_min_val < out_min_val) {
> I don't remember if we talked about clang-format-diff yet, but if not you m
Done


http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/util/min-max-filter.cc@381
PS2, Line 381:     TimestampValue in_max_val = TimestampValue::FromColumnValuePB(in.max());
> line too long (98 > 90)
Done


http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/util/min-max-filter.cc@404
PS2, Line 404: // Construct the Decimal min-max filter when the min-max filter information
> line too long (94 > 90)
Done


http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/util/min-max-filter.cc@507
PS2, Line 507:     if (Deci
> I think this was unintentional?
Thanks for pointing this out! I have removed it.


http://gerrit.cloudera.org:8080/#/c/13882/2/common/protobuf/control_service.proto
File common/protobuf/control_service.proto:

http://gerrit.cloudera.org:8080/#/c/13882/2/common/protobuf/control_service.proto@274
PS2, Line 274: }
> please omit this from the patch
Done


http://gerrit.cloudera.org:8080/#/c/13882/2/common/protobuf/data_stream_service.proto
File common/protobuf/data_stream_service.proto:

http://gerrit.cloudera.org:8080/#/c/13882/2/common/protobuf/data_stream_service.proto@82
PS2, Line 82:   // Log_2 of the bufferpool space required for this filter.
> This can be removed (and the corresponding fields in the messages below).
Done


http://gerrit.cloudera.org:8080/#/c/13882/2/common/protobuf/data_stream_service.proto@89
PS2, Line 89: optional
> Lets make everything optional
Done


http://gerrit.cloudera.org:8080/#/c/13882/2/common/protobuf/data_stream_service.proto@103
PS2, Line 103: 
> Put this in common.proto
Done


http://gerrit.cloudera.org:8080/#/c/13882/2/common/protobuf/data_stream_service.proto@127
PS2, Line 127: }
> Remove this.
Done


http://gerrit.cloudera.org:8080/#/c/13882/2/common/protobuf/data_stream_service.proto@133
PS2, Line 133: 
> remove this, here and below
Done


http://gerrit.cloudera.org:8080/#/c/13882/2/common/protobuf/data_stream_service.proto@150
PS2, Line 150:   // Latency for response in the receiving daemon in nanoseconds.
> copy the comment from the equivalent field above
Done


http://gerrit.cloudera.org:8080/#/c/13882/2/common/protobuf/data_stream_service.proto@196
PS2, Line 196: 
> copy over the comments that were with these in ImpalaInternalService.thrift
Done


http://gerrit.cloudera.org:8080/#/c/13882/2/common/thrift/ImpalaInternalService.thrift
File common/thrift/ImpalaInternalService.thrift:

http://gerrit.cloudera.org:8080/#/c/13882/2/common/thrift/ImpalaInternalService.thrift@705
PS2, Line 705: service ImpalaInternalService {
             : 
             : }
> This can be removed
This service is kept in this patch set but will be removed in a follow up patch according to our discussion.



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 11
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Thu, 25 Jul 2019 15:22:11 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC

Posted by "Thomas Tauber-Marshall (Code Review)" <ge...@cloudera.org>.
Thomas Tauber-Marshall has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC
......................................................................


Patch Set 12:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/13882/12/be/src/util/bloom-filter.cc
File be/src/util/bloom-filter.cc:

http://gerrit.cloudera.org:8080/#/c/13882/12/be/src/util/bloom-filter.cc@229
PS12, Line 229: void BloomFilter::Or(const BloomFilterPB& in, BloomFilterPB* out) {
I suspect you may have kept the 'directory' field in BloomFilterPB because its sort of difficult to see how to implement a function like this if the data is in a sidecar.

I think the solution is that we'll just need to treat the bloom filter as two objects: the BloomFilterPB and a string.

Then, in coordinator-filter-state in addition to 'bloom_filter_' field we can add something like a 'bloom_filter_directory_' field, which will be either a string or possibly a kudu::Slice or kudu::faststring depending on what's convenient, and we can maintain both of those in FilterState::ApplyUpdate().

This function will then take four params: BloomFilterPB* in, string directory_in, BloomFilterPB* out, string* directory_out



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 12
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Fri, 02 Aug 2019 17:27:25 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC
......................................................................


Patch Set 3:

Build Failed 

https://jenkins.impala.io/job/gerrit-code-review-checks/3922/ : Initial code review checks failed. See linked job for details on the failure.


-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 3
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Fri, 19 Jul 2019 18:05:44 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has submitted this change and it was merged. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................

IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Previously the aggregation and propagation of a runtime filter in Impala is
implemented using Thrift RPC, which suffers from a disadvantage that the number
of connections in a cluster grows with both the number of queries and cluster
size. This patch ports the functions that implement the aggregation and
propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(),
respctively, to KRPC, which requires only one connection per direction between
every pair of hosts, thus reducing the number of connections in a cluster.

In addition, this patch also incorporates KRPC sidecar when the runtime filter
is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the
Bloom filter contents when a Bloom filter is serialized to be transmitted and
hence reduces the serialization overhead. Due to the incorporation of KRPC
sidecar, a SpinLock is also added to prevent a BloomFilter from being
deallocated before its associated KRPC call finishes.

Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are
also modified accordingly because of the changes to the signatures of some
functions in BloomFilter.

Testing:
This patch has passed the exhaustive tests.

Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Reviewed-on: http://gerrit.cloudera.org:8080/13882
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/runtime/backend-client.h
M be/src/runtime/client-cache.cc
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/coordinator-filter-state.h
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/decimal-value.h
M be/src/runtime/decimal-value.inline.h
M be/src/runtime/exec-env.cc
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-filter.h
M be/src/runtime/timestamp-value.h
M be/src/scheduling/request-pool-service.h
M be/src/service/client-request-state.cc
M be/src/service/client-request-state.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/frontend.h
M be/src/service/impala-internal-service.cc
M be/src/service/impala-internal-service.h
M be/src/service/impala-server.cc
M be/src/service/impala-server.h
M be/src/util/bloom-filter-test.cc
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/min-max-filter-test.cc
M be/src/util/min-max-filter.cc
M be/src/util/min-max-filter.h
M common/protobuf/common.proto
M common/protobuf/data_stream_service.proto
M common/thrift/ImpalaInternalService.thrift
39 files changed, 1,094 insertions(+), 757 deletions(-)

Approvals:
  Impala Public Jenkins: Looks good to me, approved; Verified

-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: merged
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 32
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>

[Impala-ASF-CR] IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC

Posted by "Fang-Yu Rao (Code Review)" <ge...@cloudera.org>.
Fang-Yu Rao has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC
......................................................................


Patch Set 5:

(23 comments)

I have addressed most of Thomas' previous comments. Will be currently working on incorporating sidecar. Thanks.

http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/decimal-value.h
File be/src/runtime/decimal-value.h:

http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/decimal-value.h@27
PS3, Line 27: #include "runtime/types.h"
> alphabetize
Done


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/query-state.h
File be/src/runtime/query-state.h:

http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/query-state.h@36
PS3, Line 36: 
> put this with the includes above
Done


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.h
File be/src/runtime/runtime-filter-bank.h:

http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.h@31
PS3, Line 31: #include <boost/unordered_map.hpp>
> put this up with the other impala-internal includes above
Done


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.cc@24
PS3, Line 24: 
> remove this
Done


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.cc@25
PS3, Line 25: #include "gen-cpp/data_stream_service.proxy.h"
> whitespace can be removed
Done


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.cc@43
PS3, Line 43: 
            : #include "common/names.h"
            : 
> move these up with the includes above
Thanks for the explanation for common/names.h!


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.cc@110
PS3, Line 110:   Status get_proxy_status =
> move this declaration down to just above where its actually used
Done


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.cc@122
PS3, Line 122: UpdateFilterResult
> You can just declare this on the same line its set, i.e. 'kudu::Status rpc_
Done


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.cc@125
PS3, Line 125: < "UpdateFilte
> since we're not returning this status or anything, we can just keep it as a
Done


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.cc@127
PS3, Line 127: 
> Lets say something like "UpdateFilter() failed: "
Done


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.cc@128
PS3, Line 128: 
> unneccesary
Done


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.cc@137
PS3, Line 137: bool has
> should just be 4 spaces like it was before
Done


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.cc@169
PS3, Line 169:  (
> remove this, the formatting was already right
Done


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.cc@170
PS3, Line 170:   BloomFilter::ToProtobuf(bloom_fil
> instead of using 'new' here and 'set_allocated' below, it would be cleaner 
Done


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.cc@182
PS3, Line 182:  RuntimeFilterB
> const TNetworkAddress&
Thanks for pointing this out!


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.cc@184
PS3, Line 184:  (closed
> 4 spaces
Done


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.cc@208
PS3, Line 208:   bloom_filters_.push_back(bloom_filter);
> remove this
Done


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/timestamp-value.h
File be/src/runtime/timestamp-value.h:

http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/timestamp-value.h@176
PS3, Line 176:   void ToColumnValuePB(ColumnValuePB* pvalue) const {
> copy the comment from ToTColumnValue
Done


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/timestamp-value.h@189
PS3, Line 189: value_p
> typo
Done


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/service/data-stream-service.h
File be/src/service/data-stream-service.h:

http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/service/data-stream-service.h@66
PS3, Line 66:   /// Called by fragment instances that produce local runtime filters to deliver them to
> Could you add some brief comments on these? You can just copy the comments 
Thanks for the suggestion. It is done.


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/service/data-stream-service.cc
File be/src/service/data-stream-service.cc:

http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/service/data-stream-service.cc@36
PS3, Line 36: #include "util/memory-metrics.h"
            : #include "util/parse-util.h"
            : #include "testutil/fault-injection-util.h"
            : #include "service/impala-server.h"
            : 
> nit: all of these includes can be alphabetized into the same block
Done


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/service/data-stream-service.cc@132
PS3, Line 132: ) retu
> I think you'll also need to call RespondRpc() in this case. (you can flip t
Thanks for pointing this out!

When "qs.get() == nullptr", I was wondering if we need to return some error message to the client side of this data stream service, e.g., "Status("qs.get() == nullptr")" or we need to define some TErrorCode and then return it.


http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/service/impala-server.h
File be/src/service/impala-server.h:

http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/service/impala-server.h@601
PS2, Line 601:   friend class ControlService;
> remove this change
Done



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 5
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Sat, 20 Jul 2019 19:24:33 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC

Posted by "Fang-Yu Rao (Code Review)" <ge...@cloudera.org>.
Fang-Yu Rao has uploaded a new patch set (#3). ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC
......................................................................

IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC

Port UpdateFilter() and PublishFilter() to KRPC w/o sidecar.

Testing:
1. Have tested this patch set using those EE cases in test_runtime_filters.py
on my local dev box.
2. Have tested this patch set using those BE tests in bloom-filter-test.cc
on my local dev box.

TODO:
Incorporate sidecar.

Change-Id: I6b394796d250286510e157ae326882bfc01d387a
---
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/rpc/thrift-server-test.cc
M be/src/runtime/backend-client.h
M be/src/runtime/client-cache.cc
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/coordinator-filter-state.h
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/decimal-value.h
M be/src/runtime/decimal-value.inline.h
M be/src/runtime/exec-env.cc
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/timestamp-value.h
M be/src/scheduling/request-pool-service.h
M be/src/service/client-request-state.cc
M be/src/service/client-request-state.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/frontend.h
M be/src/service/impala-internal-service.cc
M be/src/service/impala-internal-service.h
M be/src/service/impala-server.cc
M be/src/service/impala-server.h
M be/src/util/bloom-filter-test.cc
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/min-max-filter-test.cc
M be/src/util/min-max-filter.cc
M be/src/util/min-max-filter.h
M common/protobuf/control_service.proto
M common/protobuf/data_stream_service.proto
M common/thrift/ImpalaInternalService.thrift
39 files changed, 769 insertions(+), 686 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/82/13882/3
-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 3
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>

[Impala-ASF-CR] IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC
......................................................................


Patch Set 2:

Build Failed 

https://jenkins.impala.io/job/gerrit-code-review-checks/3914/ : Initial code review checks failed. See linked job for details on the failure.


-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 2
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Fri, 19 Jul 2019 06:54:29 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC

Posted by "Michael Ho (Code Review)" <ge...@cloudera.org>.
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC
......................................................................


Patch Set 11:

(6 comments)

http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/backend-client.h
File be/src/runtime/backend-client.h:

http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/backend-client.h@19
PS11, Line 19: #define IMPALA_BACKEND_CLIENT_H
Should this file be deleted after this change ?


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/service/data-stream-service.cc
File be/src/service/data-stream-service.cc:

http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/service/data-stream-service.cc@136
PS11, Line 136: "qs.get() == nullptr";
Is it a case in which it's legitimate for query state to be not found here ?

This error message could be clearer:

    Query State not found for <query_id>.


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/service/impala-internal-service.h
File be/src/service/impala-internal-service.h:

http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/service/impala-internal-service.h@18
PS11, Line 18: #ifndef IMPALA_SERVICE_IMPALA_INTERNAL_SERVICE_H
Shouldn't this file be deleted after this change ?


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/service/impala-internal-service.cc
File be/src/service/impala-internal-service.cc:

http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/service/impala-internal-service.cc@36
PS11, Line 36: ImpalaInternalService::ImpalaInternalService() {
Same question. Shouldn't this file be deleted after this change ? Also, we can also skip initializing the backend service Thrift server during Impala initialization.


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/service/impala-server.h
File be/src/service/impala-server.h:

http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/service/impala-server.h@342
PS11, Line 342: /// ImpalaInternalService rpcs
Stale comment. ImpalaInternalService should not exist anymore after this change.


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/util/bloom-filter.h
File be/src/util/bloom-filter.h:

http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/util/bloom-filter.h@79
PS11, Line 79: Thrift representation.
Stale comment



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 11
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Thu, 01 Aug 2019 20:30:50 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC
......................................................................


Patch Set 11:

Build Successful 

https://jenkins.impala.io/job/gerrit-code-review-checks/3997/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests.


-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 11
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Thu, 25 Jul 2019 16:00:27 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Michael Ho (Code Review)" <ge...@cloudera.org>.
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 27: Code-Review+2

(19 comments)

LGTM. Please address the comments below.

http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/benchmarks/bloom-filter-benchmark.cc
File be/src/benchmarks/bloom-filter-benchmark.cc:

http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/benchmarks/bloom-filter-benchmark.cc@290
PS27, Line 290:  std::shared_ptr
No need for shared_ptr


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1022
PS25, Line 1022: {
> Thanks for pointing this out.
Makes sense.


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1072
PS25, Line 1072:       bloom_filter_directory.swap(state->bloom_filter_directory());
               :       DCHECK(rpc_params.bloom_filter().always_false()
> Thanks for the suggestion!
I guess it can simply be

*rpc_params.mutable_bloom_filter() = state->bloom_filter();


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1074
PS25, Line 1074:     || rpc_params.bloo
> Thanks for the suggestion.
Missed the part about the scope of state. Think it's fine to keep it as-is. Thanks for the explanation.


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1098
PS25, Line 1098: 
> Since 'state' is defined inside of the critical section as mentioned above,
Makes sense.


http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/runtime/coordinator.cc@1096
PS27, Line 1096: reinterpret_cast<const char*>(&(bloom_filter_directory[0])),
               :             static_cast<unsigned long>(bloom_filter_directory.size())
Please see comments at BloomFilter::AddDirectorySidecar(). We can pass in the string directly instead.


http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/runtime/runtime-filter-bank.cc@187
PS27, Line 187: DCHECK(
nit: DCHECK_EQ


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/service/impala-server.h
File be/src/service/impala-server.h:

http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/service/impala-server.h@35
PS25, Line 35: #include "gen-cpp/ImpalaInternalService.h"
> Thanks for this suggestion.
Looks like it's more involved than expected. Please feel free to defer it to the follow-up patch which removes ImpalaInternalService altogether.


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/service/impala-server.h@70
PS25, Line 70: class TSessionState;
             : class TQueryOptions;
> Thanks for pointing this out!
Sounds good.


http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/util/bloom-filter-test.cc
File be/src/util/bloom-filter-test.cc:

http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/util/bloom-filter-test.cc@a68
PS27, Line 68: 
             : 
             : 
Why not keep most of this header comment which explains what this test does and talks about the output arguments (e.g. success, protobuf, directory)


http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/util/bloom-filter-test.cc@103
PS27, Line 103: *success = directory_y.compare(directory_y2) == 0 ? true : false;
*success = directory_y.compare(directory_y2) == 0;


http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/util/bloom-filter-test.cc@365
PS27, Line 365: std::shared_ptr<
No need for shared_ptr


http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/util/bloom-filter-test.cc@367
PS27, Line 367: 
nit: unnecessary blank line


http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/util/bloom-filter-test.cc@395
PS27, Line 395:  TBloomFilter
BloomFilterPB


http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/util/bloom-filter.h
File be/src/util/bloom-filter.h:

http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/util/bloom-filter.h@95
PS27, Line 95: const uint8_t* directory_in,
             :       size_t directory_in_size)
Can this interface take a string only instead for the directory ? Can we use directory.size() to get the size in Init() or are there cases in which passing a string won't work ?


http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/util/bloom-filter.cc
File be/src/util/bloom-filter.cc:

http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/util/bloom-filter.cc@79
PS27, Line 79: protobuf
rpc_params


http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/util/bloom-filter.cc@80
PS27, Line 80: const char* directory,
             :     unsigned long directory_size
See comments below. This could be const string& directory. I believe faststring::append() also has an interface for taking string as input.


http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/util/bloom-filter.cc@85
PS27, Line 85:   unique_ptr<kudu::faststring> sidecar_buf = make_unique<kudu::faststring>();
             :   sidecar_buf->append(directory, directory_size);
             :   unique_ptr<kudu::rpc::RpcSidecar> rpc_sidecar =
             :       kudu::rpc::RpcSidecar::FromFaststring(std::move(sidecar_buf))
Would like to point out an alternative would be to use a slice instead of a faststring. This avoids the need to copy into the faststring buffer but directory needs to be around until the RPC is done. On the other hand, keeping the code as-is leaves a cleaner interface so this may actually be better. If you choose to use the option of using a slice, please make sure to highlight this side-effect (i.e. directory needs to be alive until the RPC is done).

    kudu::Slice dir_slice(directory);
    kudu::rpc::RpcSidecar::FromSlice(dir_slice);


http://gerrit.cloudera.org:8080/#/c/13882/27/common/protobuf/data_stream_service.proto
File common/protobuf/data_stream_service.proto:

http://gerrit.cloudera.org:8080/#/c/13882/27/common/protobuf/data_stream_service.proto@91
PS27, Line 91:   // The sidecar index associated with the directory of a Bloom filter.
             :   // A directory is a list of buckets representing the Bloom Filter contents,
             :   // laid out contiguously in one string for efficiency of (de)serialisation.
             :   // See BloomFilter::Bucket and BloomFilter::directory_.
             :   optional int32 directory_sidecar_idx = 4;
Since BloomFilterPB is also used as the representation of the aggregated bloom filter in the coordinator, does it make more sense to store the directory_sidecar_idx in UpdateFilterParamsPB instead ?



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 27
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Fri, 01 Nov 2019 23:52:47 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Fang-Yu Rao (Code Review)" <ge...@cloudera.org>.
Fang-Yu Rao has uploaded a new patch set (#14). ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................

IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Previously the aggregation and propagation of a runtime filter in Impala is
implemented using Thrift RPC, which suffers from a disadvantage that the number
of connections in a cluster grows with both the number of queries and cluster
size. This patch ports the functions that implement the aggregation and
propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(),
respctively, to KRPC, which requires only one connection per direction between
every pair of hosts, thus reducing the number of connections in a cluster.

In addition, this patch also incorporates KRPC sidecar when the runtime filter
is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the
Bloom filter contents when a Bloom filter is serialized to be transmitted and
hence reduces the serialization overhead.

Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are
also modified accordingly.

TODO:
To remove unnecessary code related to ImpalaInternalService.

Testing:
This patch has passed the exhaustive tests.

Change-Id: I6b394796d250286510e157ae326882bfc01d387a
---
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/rpc/thrift-server-test.cc
M be/src/runtime/backend-client.h
M be/src/runtime/client-cache.cc
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/coordinator-filter-state.h
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/decimal-value.h
M be/src/runtime/decimal-value.inline.h
M be/src/runtime/exec-env.cc
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/krpc-data-stream-recvr.cc
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/timestamp-value.h
M be/src/scheduling/request-pool-service.h
M be/src/service/client-request-state.cc
M be/src/service/client-request-state.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/frontend.h
M be/src/service/impala-internal-service.cc
M be/src/service/impala-internal-service.h
M be/src/service/impala-server.cc
M be/src/service/impala-server.h
M be/src/util/bloom-filter-test.cc
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/min-max-filter-test.cc
M be/src/util/min-max-filter.cc
M be/src/util/min-max-filter.h
M common/protobuf/common.proto
M common/protobuf/data_stream_service.proto
M common/thrift/ImpalaInternalService.thrift
40 files changed, 1,063 insertions(+), 728 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/82/13882/14
-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 14
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 19:

Build Successful 

https://jenkins.impala.io/job/gerrit-code-review-checks/4547/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests.


-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 19
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Thu, 12 Sep 2019 00:35:40 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC

Posted by "Thomas Tauber-Marshall (Code Review)" <ge...@cloudera.org>.
Thomas Tauber-Marshall has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC
......................................................................


Patch Set 3:

(23 comments)

http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/decimal-value.h
File be/src/runtime/decimal-value.h:

http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/decimal-value.h@27
PS3, Line 27: #include "gen-cpp/data_stream_service.pb.h"
alphabetize


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/query-state.h
File be/src/runtime/query-state.h:

http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/query-state.h@36
PS3, Line 36: #include "gen-cpp/data_stream_service.pb.h"
put this with the includes above


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.h
File be/src/runtime/runtime-filter-bank.h:

http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.h@31
PS3, Line 31: #include "gen-cpp/data_stream_service.pb.h"
put this up with the other impala-internal includes above


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.h@97
PS3, Line 97:   UniqueIdPB* convertUniqueId(const TUniqueId& tqid);
remove this


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.cc@24
PS3, Line 24: // need this to invoke get proxy
remove this


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.cc@25
PS3, Line 25: 
whitespace can be removed


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.cc@43
PS3, Line 43: #include "service/control-service.h"
            : #include "service/data-stream-service.h"
            : #include "exec/kudu-util.h"
move these up with the includes above

common/names.h has a bunch of 'using's in it, so we always import it last so they don't affect the other files being imported


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.cc@110
PS3, Line 110:   UpdateFilterResultPB res;
move this declaration down to just above where its actually used


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.cc@122
PS3, Line 122: Status rpc_status;
You can just declare this on the same line its set, i.e. 'kudu::Status rpc_status = proxy->...' on line 125


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.cc@125
PS3, Line 125: FromKuduStatus
since we're not returning this status or anything, we can just keep it as a kudu::Status. It will still support 'ok()', for the logging you can use 'ToString()'


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.cc@127
PS3, Line 127: proxy->UpdateFilterPB not ok: 
Lets say something like "UpdateFilter() failed: "


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.cc@128
PS3, Line 128: return;
unneccesary


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.cc@137
PS3, Line 137:         
should just be 4 spaces like it was before


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.cc@169
PS3, Line 169:   
remove this, the formatting was already right


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.cc@170
PS3, Line 170: UniqueIdPB* qidpb = new UniqueIdPB;
instead of using 'new' here and 'set_allocated' below, it would be cleaner to pass params.mutable_query_id() in as the second param to TUniqueIdToUniqueIdPB()


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.cc@182
PS3, Line 182: TNetworkAddress
const TNetworkAddress&

(saves a copy)


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.cc@184
PS3, Line 184:         
4 spaces


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/runtime-filter-bank.cc@208
PS3, Line 208: // memory will be allocated inside InitPB
remove this


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/timestamp-value.h
File be/src/runtime/timestamp-value.h:

http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/timestamp-value.h@176
PS3, Line 176:   void ToColumnValuePB(ColumnValuePB* pvalue) const {
copy the comment from ToTColumnValue


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/runtime/timestamp-value.h@189
PS3, Line 189: valuepb
typo


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/service/data-stream-service.h
File be/src/service/data-stream-service.h:

http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/service/data-stream-service.h@66
PS3, Line 66:   virtual void UpdateFilter(const UpdateFilterParamsPB *req,
Could you add some brief comments on these? You can just copy the comments that were removed from ImpalaInternalService.thrift if you want


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/service/data-stream-service.cc
File be/src/service/data-stream-service.cc:

http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/service/data-stream-service.cc@36
PS3, Line 36: #include "service/impala-server.h"
            : 
            : #include "gen-cpp/data_stream_service.pb.h"
            : #include "gen-cpp/data_stream_service.proxy.h"
            : #include "runtime/query-state.h"
nit: all of these includes can be alphabetized into the same block


http://gerrit.cloudera.org:8080/#/c/13882/3/be/src/service/data-stream-service.cc@132
PS3, Line 132: return
I think you'll also need to call RespondRpc() in this case. (you can flip the check to only call PublishFilter() if qs != nullptr and then only have one call to RespondRpc())



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 3
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Fri, 19 Jul 2019 23:52:03 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 20:

Build Successful 

https://jenkins.impala.io/job/gerrit-code-review-checks/4586/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests.


-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 20
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Wed, 18 Sep 2019 16:49:53 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Fang-Yu Rao (Code Review)" <ge...@cloudera.org>.
Fang-Yu Rao has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 24:

(6 comments)

Hi Michael and Thomas, I have addressed some of the comments in the previous iteration. There are still two unresolved comments in runtime-filter-bank.cc. Please let me know your thoughts on my replies. Thanks!

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1134
PS24, Line 1134: std::string(
               :               reinterpret_cast<const char*>(sidecar_slice.data()), sidecar_slice.size());
> Why not std::move(sidecar_slice.ToString()) ?
Thanks for pointing this out! I have replaced the original statement with 'std::move(sidecar_slice.ToString())' to make it more elegant.


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@174
PS24, Line 174: UpdateFilterParamsPB* params = obj_pool_.Add(new UpdateFilterParamsPB);
> Can you please double check if the parameters need to be preserved beyond t
Thank you for pointing this out! You are correct. 'params' can be freed once the asynchronous RPC call is done. I have replaced the original statement with 'std::unique_ptr<UpdateFilterParamsPB> params = std::make_unique<UpdateFilterParamsPB>();' and revised the following code accordingly.


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@175
PS24, Line 175:     UpdateFilterResultPB* res = obj_pool_.Add(new UpdateFilterResultPB);
              :     RpcController* controller = obj_pool_.Add(new RpcController);
> I wonder if we can keep these in thread local storage and initialize them o
Thank you very much for the suggestion!

After briefly taking a look at the related sequence of calls, I found that 
starting from QueryState::ExecFInstance() (which will call FragmentInstanceState::Exec()), we will reach PhjBuilder::FlushFinal(). In PhjBuilder::FlushFinal(), we have a function call to PhjBuilder::PublishRuntimeFilters(), which will make a call to RuntimeFilterBank::UpdateFilterFromLocal() for each FilterContext associated with this PhjBuilder (refer to https://github.com/apache/impala/blob/master/be/src/exec/partitioned-hash-join-builder.cc#L488-L507). 

Considering that RuntimeFilterBank::UpdateFilterFromLocal() is an asynchronous RPC now, it seems we are not able to reuse 'UpdateFilterResultPB' and 'UpdateFilterResultPB'.

Another possible solution is to create an instance of UpdateFilterResultPB and UpdateFilterResultPB here in UpdateFilterFromLocal() and then release the memory they occupy in RuntimeFilterBank::UpdateFilterCompleteCb().

I could probably miss something. Please let me know if I misunderstand anything.


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@320
PS24, Line 320:   // Wait for all inflight rpcs to complete before closing the filters.
              :   {
              :     std::unique_lock<SpinLock> l1(num_inflight_rpcs_lock_);
              :     while (num_inflight_rpcs_ > 0) {
              :       krpcs_done_cv_.wait(l1);
              :     }
              :   }
              : 
              :   lock_guard<mutex> l2(runtime_filter_lock_);
              :   closed_ = true;
> Do you need to set closed_ to true before waiting for all in-flight RPCs to
Thanks very much for the comment.

After reading the related code paths, I think the case where a thread may sneak in and try to issue an RPC again after we break out of the critical section could not happen.

According to my current understanding, in the propagation stage of the runtime filter protocol, starting from FragmentInstanceState::Exec(), the thread executing this function will first call FragmentInstanceState::Open(), which will (1) call RuntimeFilterBank::AllocateScratchBloomFilter()/AllocateScratchMinMaxFilter() to allocate the memory space for the corresponding RuntimeFilter, and (2) call RuntimeFilterBank::UpdateFilterFromLocal() on the RuntimeFilter just allocated. After FragmentInstanceState::Open() returns, the same thread above will call FragmentInstanceState::Close(), which in turn will lead us to RuntimeFilterBank::Close().

When RuntimeFilterBank::Close() is called, the calling thread has already made all the RPC's it wants to perform (There could still be inflight RPC's but there won't be any new RPC's issued). No thread will attempt to make any RPC resulting from RuntimeFilterBank::UpdateFilterFromLocal() and then increment 'num_inflight_rpcs_' associated with this instance of RuntimeFilterBank. Hence the case where a thread sneaks in and try to issue RPC after leaving the critical section cannot happen.

We have also added a DCHECK for '!closed_' at the beginning of RuntimeFilterBank::UpdateFilterFromLocal() to empirically show that the RuntimeFilterBank::Close() (where 'closed_' will be set to true) will not be called until the calling thread has issued the all the RPC's it needs to issue (https://gerrit.cloudera.org/c/13882/24/be/src/runtime/runtime-filter-bank.cc#139).

Please let me know if the argument above makes sense since I could possibly have missed something.


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/service/data-stream-service.cc
File be/src/service/data-stream-service.cc:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/service/data-stream-service.cc@137
PS24, Line 137:  Substitute("Query State not found for query_id=$0",
              :         PrintId(ProtoToQueryId(req->dst_query_id()))
> This can be refactored into a single string object and reuse them here and 
Done


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/service/data-stream-service.cc@141
PS24, Line 141: this->
> no need for this
Thanks for pointing this out! I have removed 'this->' in DataStreamService::PublishFilter() as well as in DataStreamService::UpdateFilter().



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 24
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Fri, 27 Sep 2019 22:05:21 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Fang-Yu Rao (Code Review)" <ge...@cloudera.org>.
Fang-Yu Rao has uploaded a new patch set (#15). ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................

IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Previously the aggregation and propagation of a runtime filter in Impala is
implemented using Thrift RPC, which suffers from a disadvantage that the number
of connections in a cluster grows with both the number of queries and cluster
size. This patch ports the functions that implement the aggregation and
propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(),
respctively, to KRPC, which requires only one connection per direction between
every pair of hosts, thus reducing the number of connections in a cluster.

In addition, this patch also incorporates KRPC sidecar when the runtime filter
is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the
Bloom filter contents when a Bloom filter is serialized to be transmitted and
hence reduces the serialization overhead.

Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are
also modified accordingly.

TODO:
1. To add the logic that keeps the BloomFilter passed into UpdateFilterFromLocal()
alive before its corresponding KRPC call finishes.

Change-Id: I6b394796d250286510e157ae326882bfc01d387a
---
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/rpc/thrift-server-test.cc
M be/src/runtime/backend-client.h
M be/src/runtime/client-cache.cc
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/coordinator-filter-state.h
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/decimal-value.h
M be/src/runtime/decimal-value.inline.h
M be/src/runtime/exec-env.cc
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/krpc-data-stream-recvr.cc
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/timestamp-value.h
M be/src/scheduling/request-pool-service.h
M be/src/service/client-request-state.cc
M be/src/service/client-request-state.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/frontend.h
M be/src/service/impala-internal-service.cc
M be/src/service/impala-internal-service.h
M be/src/service/impala-server.cc
M be/src/service/impala-server.h
M be/src/util/bloom-filter-test.cc
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/min-max-filter-test.cc
M be/src/util/min-max-filter.cc
M be/src/util/min-max-filter.h
M common/protobuf/common.proto
M common/protobuf/data_stream_service.proto
M common/thrift/ImpalaInternalService.thrift
40 files changed, 1,108 insertions(+), 723 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/82/13882/15
-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 15
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Fang-Yu Rao (Code Review)" <ge...@cloudera.org>.
Fang-Yu Rao has uploaded a new patch set (#17). ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................

IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Previously the aggregation and propagation of a runtime filter in Impala is
implemented using Thrift RPC, which suffers from a disadvantage that the number
of connections in a cluster grows with both the number of queries and cluster
size. This patch ports the functions that implement the aggregation and
propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(),
respctively, to KRPC, which requires only one connection per direction between
every pair of hosts, thus reducing the number of connections in a cluster.

In addition, this patch also incorporates KRPC sidecar when the runtime filter
is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the
Bloom filter contents when a Bloom filter is serialized to be transmitted and
hence reduces the serialization overhead. Due to the incorporation of KRPC
sidecar, a SpinLock is also added to prevent a BloomFilter from being
deallocated before its associated KRPC call finishes.

Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are
also modified accordingly because of the changes to the signatures of some
functions in BloomFilter.

Testing:
This patch has passed the exhaustive tests.

Change-Id: I6b394796d250286510e157ae326882bfc01d387a
---
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/rpc/thrift-server-test.cc
M be/src/runtime/backend-client.h
M be/src/runtime/client-cache.cc
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/coordinator-filter-state.h
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/decimal-value.h
M be/src/runtime/decimal-value.inline.h
M be/src/runtime/exec-env.cc
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/krpc-data-stream-recvr.cc
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/timestamp-value.h
M be/src/scheduling/request-pool-service.h
M be/src/service/client-request-state.cc
M be/src/service/client-request-state.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/frontend.h
M be/src/service/impala-internal-service.cc
M be/src/service/impala-internal-service.h
M be/src/service/impala-server.cc
M be/src/service/impala-server.h
M be/src/util/bloom-filter-test.cc
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/min-max-filter-test.cc
M be/src/util/min-max-filter.cc
M be/src/util/min-max-filter.h
M common/protobuf/common.proto
M common/protobuf/data_stream_service.proto
M common/thrift/ImpalaInternalService.thrift
40 files changed, 1,155 insertions(+), 733 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/82/13882/17
-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 17
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 25:

Build Successful 

https://jenkins.impala.io/job/gerrit-code-review-checks/4854/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests.


-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 25
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Tue, 22 Oct 2019 22:32:27 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Fang-Yu Rao (Code Review)" <ge...@cloudera.org>.
Fang-Yu Rao has uploaded a new patch set (#29). ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................

IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Previously the aggregation and propagation of a runtime filter in Impala is
implemented using Thrift RPC, which suffers from a disadvantage that the number
of connections in a cluster grows with both the number of queries and cluster
size. This patch ports the functions that implement the aggregation and
propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(),
respctively, to KRPC, which requires only one connection per direction between
every pair of hosts, thus reducing the number of connections in a cluster.

In addition, this patch also incorporates KRPC sidecar when the runtime filter
is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the
Bloom filter contents when a Bloom filter is serialized to be transmitted and
hence reduces the serialization overhead. Due to the incorporation of KRPC
sidecar, a SpinLock is also added to prevent a BloomFilter from being
deallocated before its associated KRPC call finishes.

Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are
also modified accordingly because of the changes to the signatures of some
functions in BloomFilter.

Testing:
This patch has passed the exhaustive tests.

Change-Id: I6b394796d250286510e157ae326882bfc01d387a
---
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/runtime/backend-client.h
M be/src/runtime/client-cache.cc
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/coordinator-filter-state.h
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/decimal-value.h
M be/src/runtime/decimal-value.inline.h
M be/src/runtime/exec-env.cc
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-filter.h
M be/src/runtime/timestamp-value.h
M be/src/scheduling/request-pool-service.h
M be/src/service/client-request-state.cc
M be/src/service/client-request-state.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/frontend.h
M be/src/service/impala-internal-service.cc
M be/src/service/impala-internal-service.h
M be/src/service/impala-server.cc
M be/src/service/impala-server.h
M be/src/util/bloom-filter-test.cc
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/min-max-filter-test.cc
M be/src/util/min-max-filter.cc
M be/src/util/min-max-filter.h
M common/protobuf/common.proto
M common/protobuf/data_stream_service.proto
M common/thrift/ImpalaInternalService.thrift
39 files changed, 1,094 insertions(+), 757 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/82/13882/29
-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 29
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Thomas Tauber-Marshall (Code Review)" <ge...@cloudera.org>.
Thomas Tauber-Marshall has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 29: Code-Review+2

(1 comment)

http://gerrit.cloudera.org:8080/#/c/13882/29/be/src/util/bloom-filter.h
File be/src/util/bloom-filter.h:

http://gerrit.cloudera.org:8080/#/c/13882/29/be/src/util/bloom-filter.h@162
PS29, Line 162: static void AddDirectorySidecar(BloomFilterPB* rpc_params,
              :       kudu::rpc::RpcController* controller, const char* directory,
              :       unsigned long directory_size);
              :   static void AddDirectorySidecar(BloomFilterPB* rpc_params,
              :       kudu::rpc::RpcController* controller, const string& directory);
> I don't think its necessary to have two versions of this function. You shou
We talked offline and I was wrong



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 29
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Fri, 08 Nov 2019 00:11:50 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC
......................................................................


Patch Set 5:

Build Failed 

https://jenkins.impala.io/job/gerrit-code-review-checks/3934/ : Initial code review checks failed. See linked job for details on the failure.


-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 5
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Sat, 20 Jul 2019 20:14:46 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Fang-Yu Rao (Code Review)" <ge...@cloudera.org>.
Fang-Yu Rao has uploaded a new patch set (#20). ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................

IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Previously the aggregation and propagation of a runtime filter in Impala is
implemented using Thrift RPC, which suffers from a disadvantage that the number
of connections in a cluster grows with both the number of queries and cluster
size. This patch ports the functions that implement the aggregation and
propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(),
respctively, to KRPC, which requires only one connection per direction between
every pair of hosts, thus reducing the number of connections in a cluster.

In addition, this patch also incorporates KRPC sidecar when the runtime filter
is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the
Bloom filter contents when a Bloom filter is serialized to be transmitted and
hence reduces the serialization overhead. Due to the incorporation of KRPC
sidecar, a SpinLock is also added to prevent a BloomFilter from being
deallocated before its associated KRPC call finishes.

Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are
also modified accordingly because of the changes to the signatures of some
functions in BloomFilter.

Testing:
This patch has passed the exhaustive tests.

Change-Id: I6b394796d250286510e157ae326882bfc01d387a
---
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/runtime/backend-client.h
M be/src/runtime/client-cache.cc
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/coordinator-filter-state.h
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/decimal-value.h
M be/src/runtime/decimal-value.inline.h
M be/src/runtime/exec-env.cc
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-filter.h
M be/src/runtime/timestamp-value.h
M be/src/scheduling/request-pool-service.h
M be/src/service/client-request-state.cc
M be/src/service/client-request-state.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/frontend.h
M be/src/service/impala-internal-service.cc
M be/src/service/impala-internal-service.h
M be/src/service/impala-server.cc
M be/src/service/impala-server.h
M be/src/util/bloom-filter-test.cc
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/min-max-filter-test.cc
M be/src/util/min-max-filter.cc
M be/src/util/min-max-filter.h
M common/protobuf/common.proto
M common/protobuf/data_stream_service.proto
M common/thrift/ImpalaInternalService.thrift
39 files changed, 1,066 insertions(+), 738 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/82/13882/20
-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 20
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Fang-Yu Rao (Code Review)" <ge...@cloudera.org>.
Fang-Yu Rao has uploaded a new patch set (#27). ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................

IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Previously the aggregation and propagation of a runtime filter in Impala is
implemented using Thrift RPC, which suffers from a disadvantage that the number
of connections in a cluster grows with both the number of queries and cluster
size. This patch ports the functions that implement the aggregation and
propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(),
respctively, to KRPC, which requires only one connection per direction between
every pair of hosts, thus reducing the number of connections in a cluster.

In addition, this patch also incorporates KRPC sidecar when the runtime filter
is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the
Bloom filter contents when a Bloom filter is serialized to be transmitted and
hence reduces the serialization overhead. Due to the incorporation of KRPC
sidecar, a SpinLock is also added to prevent a BloomFilter from being
deallocated before its associated KRPC call finishes.

Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are
also modified accordingly because of the changes to the signatures of some
functions in BloomFilter.

Testing:
This patch has passed the exhaustive tests.

Change-Id: I6b394796d250286510e157ae326882bfc01d387a
---
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/runtime/backend-client.h
M be/src/runtime/client-cache.cc
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/coordinator-filter-state.h
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/decimal-value.h
M be/src/runtime/decimal-value.inline.h
M be/src/runtime/exec-env.cc
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-filter.h
M be/src/runtime/timestamp-value.h
M be/src/scheduling/request-pool-service.h
M be/src/service/client-request-state.cc
M be/src/service/client-request-state.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/frontend.h
M be/src/service/impala-internal-service.cc
M be/src/service/impala-internal-service.h
M be/src/service/impala-server.cc
M be/src/service/impala-server.h
M be/src/util/bloom-filter-test.cc
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/min-max-filter-test.cc
M be/src/util/min-max-filter.cc
M be/src/util/min-max-filter.h
M common/protobuf/common.proto
M common/protobuf/data_stream_service.proto
M common/thrift/ImpalaInternalService.thrift
39 files changed, 1,080 insertions(+), 760 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/82/13882/27
-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 27
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Fang-Yu Rao (Code Review)" <ge...@cloudera.org>.
Fang-Yu Rao has uploaded a new patch set (#19). ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................

IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Previously the aggregation and propagation of a runtime filter in Impala is
implemented using Thrift RPC, which suffers from a disadvantage that the number
of connections in a cluster grows with both the number of queries and cluster
size. This patch ports the functions that implement the aggregation and
propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(),
respctively, to KRPC, which requires only one connection per direction between
every pair of hosts, thus reducing the number of connections in a cluster.

In addition, this patch also incorporates KRPC sidecar when the runtime filter
is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the
Bloom filter contents when a Bloom filter is serialized to be transmitted and
hence reduces the serialization overhead. Due to the incorporation of KRPC
sidecar, a SpinLock is also added to prevent a BloomFilter from being
deallocated before its associated KRPC call finishes.

Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are
also modified accordingly because of the changes to the signatures of some
functions in BloomFilter.

Testing:
This patch has passed the exhaustive tests.

Change-Id: I6b394796d250286510e157ae326882bfc01d387a
---
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/rpc/thrift-server-test.cc
M be/src/runtime/backend-client.h
M be/src/runtime/client-cache.cc
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/coordinator-filter-state.h
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/decimal-value.h
M be/src/runtime/decimal-value.inline.h
M be/src/runtime/exec-env.cc
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/krpc-data-stream-recvr.cc
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-filter.h
M be/src/runtime/timestamp-value.h
M be/src/scheduling/request-pool-service.h
M be/src/service/client-request-state.cc
M be/src/service/client-request-state.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/frontend.h
M be/src/service/impala-internal-service.cc
M be/src/service/impala-internal-service.h
M be/src/service/impala-server.cc
M be/src/service/impala-server.h
M be/src/util/bloom-filter-test.cc
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/min-max-filter-test.cc
M be/src/util/min-max-filter.cc
M be/src/util/min-max-filter.h
M common/protobuf/common.proto
M common/protobuf/data_stream_service.proto
M common/thrift/ImpalaInternalService.thrift
41 files changed, 1,104 insertions(+), 736 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/82/13882/19
-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 19
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 18:

Build Successful 

https://jenkins.impala.io/job/gerrit-code-review-checks/4498/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests.


-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 18
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Mon, 09 Sep 2019 15:51:29 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC
......................................................................


Patch Set 2:

(4 comments)

http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/runtime/runtime-filter-bank.cc@224
PS2, Line 224:         params->min_max_filter(), it->second->type(), &obj_pool_, filter_mem_tracker_.get());
line too long (93 > 90)


http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/util/min-max-filter.cc
File be/src/util/min-max-filter.cc:

http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/util/min-max-filter.cc@378
PS2, Line 378:     if (in_min_val < out_min_val) out->mutable_min()->set_timestamp_val(in.min().timestamp_val());
line too long (98 > 90)


http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/util/min-max-filter.cc@381
PS2, Line 381:     if (in_max_val > out_max_val) out->mutable_max()->set_timestamp_val(in.max().timestamp_val());
line too long (98 > 90)


http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/util/min-max-filter.cc@404
PS2, Line 404:   : size_(ColumnType::GetDecimalByteSize(precision)), always_false_(protobuf.always_false()) {
line too long (94 > 90)



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 2
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Fri, 19 Jul 2019 06:28:24 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC
......................................................................


Patch Set 12:

Build Successful 

https://jenkins.impala.io/job/gerrit-code-review-checks/4122/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests.


-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 12
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Fri, 02 Aug 2019 06:10:45 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Fang-Yu Rao (Code Review)" <ge...@cloudera.org>.
Fang-Yu Rao has uploaded a new patch set (#22). ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................

IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Previously the aggregation and propagation of a runtime filter in Impala is
implemented using Thrift RPC, which suffers from a disadvantage that the number
of connections in a cluster grows with both the number of queries and cluster
size. This patch ports the functions that implement the aggregation and
propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(),
respctively, to KRPC, which requires only one connection per direction between
every pair of hosts, thus reducing the number of connections in a cluster.

In addition, this patch also incorporates KRPC sidecar when the runtime filter
is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the
Bloom filter contents when a Bloom filter is serialized to be transmitted and
hence reduces the serialization overhead. Due to the incorporation of KRPC
sidecar, a SpinLock is also added to prevent a BloomFilter from being
deallocated before its associated KRPC call finishes.

Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are
also modified accordingly because of the changes to the signatures of some
functions in BloomFilter.

Testing:
This patch has passed the exhaustive tests.

Change-Id: I6b394796d250286510e157ae326882bfc01d387a
---
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/runtime/backend-client.h
M be/src/runtime/client-cache.cc
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/coordinator-filter-state.h
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/decimal-value.h
M be/src/runtime/decimal-value.inline.h
M be/src/runtime/exec-env.cc
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-filter.h
M be/src/runtime/timestamp-value.h
M be/src/scheduling/request-pool-service.h
M be/src/service/client-request-state.cc
M be/src/service/client-request-state.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/frontend.h
M be/src/service/impala-internal-service.cc
M be/src/service/impala-internal-service.h
M be/src/service/impala-server.cc
M be/src/service/impala-server.h
M be/src/util/bloom-filter-test.cc
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/min-max-filter-test.cc
M be/src/util/min-max-filter.cc
M be/src/util/min-max-filter.h
M common/protobuf/common.proto
M common/protobuf/data_stream_service.proto
M common/thrift/ImpalaInternalService.thrift
39 files changed, 1,072 insertions(+), 745 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/82/13882/22
-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 22
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>

[Impala-ASF-CR] IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC

Posted by "Thomas Tauber-Marshall (Code Review)" <ge...@cloudera.org>.
Thomas Tauber-Marshall has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC
......................................................................


Patch Set 12:

(7 comments)

Starting to really come together.

Some high level comments about the sidecar stuff

http://gerrit.cloudera.org:8080/#/c/13882/12//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/13882/12//COMMIT_MSG@11
PS12, Line 11: Having incorporated sidecar on the aggregation stage when
             : the runtime filter is a Bloom filter.
When writing commit messages, you should be thinking about it from the perspective of someone who comes along weeks or months from now and needs to know what your patch does without having any context about the process you went through to produce the patch.

So, rather than phrasing it as "I've now updated the patch to use sidecars", which only makes sense to people who have the context that the patch was originally written without them, its better to phrase it as "This patch uses sidecars because...".

Or in other words, replace this with something like:
"To save on copying, the underlying data for bloom filters is transmitted via sidecar"


http://gerrit.cloudera.org:8080/#/c/13882/12/be/src/util/bloom-filter.h
File be/src/util/bloom-filter.h:

http://gerrit.cloudera.org:8080/#/c/13882/12/be/src/util/bloom-filter.h@97
PS12, Line 97:   static void ToProtobufWithSidecar(const BloomFilter* filter,
I don't think there's any reason we need to have both implementations - in practice we should only ever be using this version, so we can just eliminate the other version and name this one ToProtobuf()


http://gerrit.cloudera.org:8080/#/c/13882/12/be/src/util/bloom-filter.h@99
PS12, Line 99: UpdateFilterParamsPB* params
If you move 'sidecar_idx' into the BloomFilterPB, I think we can just take a BloomFilterPB* here instead


http://gerrit.cloudera.org:8080/#/c/13882/12/be/src/util/bloom-filter.cc
File be/src/util/bloom-filter.cc:

http://gerrit.cloudera.org:8080/#/c/13882/12/be/src/util/bloom-filter.cc@19
PS12, Line 19: #include "kudu/rpc/rpc_controller.h"
leave a blank line after the first import for the header file that directly corresponds to this file


http://gerrit.cloudera.org:8080/#/c/13882/12/be/src/util/bloom-filter.cc@59
PS12, Line 59: Status BloomFilter::Init(const BloomFilterPB& protobuf) {
I think this needs to be updated to handle the sidecar case


http://gerrit.cloudera.org:8080/#/c/13882/12/common/protobuf/data_stream_service.proto
File common/protobuf/data_stream_service.proto:

http://gerrit.cloudera.org:8080/#/c/13882/12/common/protobuf/data_stream_service.proto@89
PS12, Line 89:   optional bytes directory = 2;
Lets eliminate this field and only allow sending bloom filters with sidecars


http://gerrit.cloudera.org:8080/#/c/13882/12/common/protobuf/data_stream_service.proto@120
PS12, Line 120:   optional int32 sidecar_idx = 5;
Lets put this in BloomFilterPB, name it something like 'directory_sidecar_idx', and include a brief comment



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 12
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Fri, 02 Aug 2019 17:11:17 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Fang-Yu Rao (Code Review)" <ge...@cloudera.org>.
Fang-Yu Rao has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 24:

(18 comments)

Hi Michael and Thomas, please review my revised patch. I have addressed all of Michael's comments in the previous iteration. Thanks!

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator-filter-state.h
File be/src/runtime/coordinator-filter-state.h:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator-filter-state.h@124
PS24, Line 124: string
> Please also document what this field stands for.
Done


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1103
PS24, Line 1103:    // Workaround for fact that parameters are const& for Protobuf RPCs.
               :     BloomFilterPB* non_const_filter = &const_cast<BloomFilterPB&>(params->bloom_filter());
> I don't think this makes sense anymore with the implementation using protob
Thanks! In this regard, I have used a copy constructor to initialize a BloomFilterPB instead of calling Swap().


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1108
PS24, Line 1108: if (!bloom_filter_.has_log_bufferpool_space()) {
> May be I am missing the details here but why is it not a no-op if the incom
Thanks for pointing out this. There is a reason for this.

After taking a closer look at the code here and performing some tests, I found that there is a corner case we have to take care of when params->bloom_filter() is always_false. Specifically, I found that we need to initialize 'bloom_filter_' even if params->bloom_filter() is always_false when 'bloom_filter_' has never been initialized by any incoming Bloom filter previously. 

If 'bloom_filter_' has never been initialized, its field of 'log_bufferpool_space_' would be 0, which in turn will result in an error due to a DCHECK in 'buffer-allocator.cc' (https://github.com/apache/impala/blob/master/be/src/runtime/bufferpool/buffer-allocator.cc#L278) that requires that the buffer length of the directory associated with the Bloom filter to be published is greater than some lower bound when attempting to call BloomFilter::Init() in RuntimeFilterBank::PublishGlobalFilter().


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1110
PS24, Line 1110: bloom_filter_.Swap(non_const_filter);
> See comments above.
Done


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1132
PS24, Line 1132:  bloom_filter_.Swap(non_const_filter);
> See comments above.
Done


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1134
PS24, Line 1134: std::string(
               :               reinterpret_cast<const char*>(sidecar_slice.data()), sidecar_slice.size());
> Thanks for pointing this out! I have replaced the original statement with '
I actually found out that if we are using "std::move(sidecar_slice.ToString())", then we will not be able to build Impala with the asan flag due to the following error message, and thus for now I am using the previous approach.

"Moving a temporary object prevents copy elision."


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1140
PS24, Line 1140: sidecar_slice.ToString(),
> Seems less expensive if we can pass the slice directly. This avoids doing a
Thanks for pointing this out! I have slightly changed the signature of BloomFilter::Or() to avoid this additional copy. Due to this change, I have also modified bloom-filter-test.cc and bloom-filter-benchmark.cc where BloomFilter::Or() is tested.


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1141
PS24, Line 1141: bloom_filter_directory_
> May need to do some const casting but please consider using: 
Indeed, because of the change to BloomFilter::Or(), I have to use the following statement to pass to BloomFilter::Or() the pointer to the beginning of the Bloom filter directory.

reinterpret_cast<uint8_t *>(const_cast<char *>(bloom_filter_directory_.data()))


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/query-state.h
File be/src/runtime/query-state.h:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/query-state.h@202
PS24, Line 202: const PublishFilterParamsPB*
> Why not keep it as const PublishFilterParamsPB& ?
Thanks for the comment. I have changed the type of 'params' to "const PublishFilterParamsPB&".

To be more specific, in DataStreamService::PublishFilter(), I pass a "const PublishFilterParamsPB&" to QueryState::PublishFilter() and have made the corresponding changes to the following functions:
1. FragmentInstanceState::PublishFilter(),
2. RuntimeFilterBank::PublishGlobalFilter().

The reason I used "const PublishFilterParamsPB*" is because in the automatically generated data_stream_service.service.cc, DataStreamServiceIf::DataStreamServiceIf() passes a "const PublishFilterParamsPB*" to its member function PublishFilter(), which is implemented by DataStreamService::PublishFilter().

QueryState::PublishFilter() is called in DataStreamService::PublishFilter() and therefore if the argument is of the type "const PublishFilterParamsPB*", we do not have to dereference in DataStreamService::PublishFilter() when calling QueryState::PublishFilter().


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@174
PS24, Line 174: UpdateFilterParamsPB* params = obj_pool_.Add(new UpdateFilterParamsPB);
> Or you can just allocate it on the stack as it isn't too large:
Done


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@175
PS24, Line 175:     UpdateFilterResultPB* res = obj_pool_.Add(new UpdateFilterResultPB);
              :     RpcController* controller = obj_pool_.Add(new RpcController);
> Don't think it's safe to free the memory in the completion callback as the 
Thanks! I will keep them as is.


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@196
PS24, Line 196: Couldn't send filter to coordinator: "
> Substitute("Failed to get proxy to coordinator $0: $1", host_address, get_p
Done


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@240
PS24, Line 240: LOG(WARNING) << "Cannot get inbound sidecar: 
> LOG(ERROR) << "Failed to get bloom filter sidecar" << ...
Done


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@320
PS24, Line 320:   // Wait for all inflight rpcs to complete before closing the filters.
              :   {
              :     std::unique_lock<SpinLock> l1(num_inflight_rpcs_lock_);
              :     while (num_inflight_rpcs_ > 0) {
              :       krpcs_done_cv_.wait(l1);
              :     }
              :   }
              : 
              :   lock_guard<mutex> l2(runtime_filter_lock_);
              :   closed_ = true;
> Actually, it's possible for RuntimeFilterBank::UpdateFilterFromLocal() and 
Thanks for the suggestion. I have added a comment accordingly.


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/service/data-stream-service.cc
File be/src/service/data-stream-service.cc:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/service/data-stream-service.cc@119
PS24, Line 119: req
> Per comments elsewhere, we usually pass const value by reference *req.
Thanks for pointing this out! I have changed the signature of ImpalaServer::UpdateFilter() accordingly.

Due to the change above, I have also made the corresponding changes to the following functions:
1. ClientRequestState::UpdateFilter(),
2. Coordinator::UpdateFilter(), and
3. Coordinator::FilterState::ApplyUpdate().


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/service/impala-server.h
File be/src/service/impala-server.h:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/service/impala-server.h@348
PS24, Line 348: const UpdateFilterParamsPB*
> Use const UpdateFilterParamsPB& params. See comments in data-stream-service
Done


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/util/bloom-filter.cc
File be/src/util/bloom-filter.cc:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/util/bloom-filter.cc@218
PS24, Line 218: const string& directory_in
> Please see comments in coordinator.cc. Please consider passing in the slide
Thanks! I have changed the signature of BloomFilter::Or() to the following.

void BloomFilter::Or(const BloomFilterPB& in, const uint8_t* directory_in, BloomFilterPB* out, uint8_t* directory_out, size_t directory_size);


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/util/bloom-filter.cc@219
PS24, Line 219: string*
> This may be char* instead.
Done.



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 24
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Mon, 14 Oct 2019 15:17:05 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 30:

Build started: https://jenkins.impala.io/job/gerrit-verify-dryrun/5194/ DRY_RUN=false


-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 30
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Fri, 08 Nov 2019 00:12:42 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 30: Verified-1

Build failed: https://jenkins.impala.io/job/gerrit-verify-dryrun/5194/


-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 30
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Fri, 08 Nov 2019 04:42:48 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC

Posted by "Thomas Tauber-Marshall (Code Review)" <ge...@cloudera.org>.
Thomas Tauber-Marshall has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC
......................................................................


Patch Set 2:

(17 comments)

I've still got a lot to go through, but wanted to post some comments for you to get started on

http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/service/impala-internal-service.h
File be/src/service/impala-internal-service.h:

http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/service/impala-internal-service.h@1
PS2, Line 1: // Licensed to the Apache Software Foundation (ASF) under one
This whole file can be removed


http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/service/impala-internal-service.cc
File be/src/service/impala-internal-service.cc:

http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/service/impala-internal-service.cc@1
PS2, Line 1: // Licensed to the Apache Software Foundation (ASF) under one
This whole file can be removed


http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/service/impala-server.h
File be/src/service/impala-server.h:

http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/service/impala-server.h@601
PS2, Line 601: 
remove this change


http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/util/bloom-filter-test.cc
File be/src/util/bloom-filter-test.cc:

http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/util/bloom-filter-test.cc@31
PS2, Line 31: #include "gen-cpp/data_stream_service.pb.h"
This can be moved up with the includes above it


http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/util/bloom-filter.cc
File be/src/util/bloom-filter.cc:

http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/util/bloom-filter.cc@208
PS2, Line 208: out->mutable_directory()->data()
I don't think that you can use string::data() like this, see: http://www.cplusplus.com/reference/string/string/data/ In general its rare that using const_cast is the right thing to do

I think what we want instead is to do it the same way as before, using '[0]'.


http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/util/min-max-filter-test.cc
File be/src/util/min-max-filter-test.cc:

http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/util/min-max-filter-test.cc@28
PS2, Line 28: #include "gen-cpp/data_stream_service.pb.h"
This can be put with the block of includes right above it


http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/util/min-max-filter.cc
File be/src/util/min-max-filter.cc:

http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/util/min-max-filter.cc@378
PS2, Line 378:     if (in_min_val < out_min_val) out->mutable_min()->set_timestamp_val(in.min().timestamp_val());
> line too long (98 > 90)
I don't remember if we talked about clang-format-diff yet, but if not you might want to look into it (though I think it'll give you the wrong advice here - if you move the body of the 'if' to the next line, make sure to add '{' and '}' around it)


http://gerrit.cloudera.org:8080/#/c/13882/2/be/src/util/min-max-filter.cc@507
PS2, Line 507: #define DECI
I think this was unintentional?


http://gerrit.cloudera.org:8080/#/c/13882/2/common/protobuf/control_service.proto
File common/protobuf/control_service.proto:

http://gerrit.cloudera.org:8080/#/c/13882/2/common/protobuf/control_service.proto@274
PS2, Line 274: }
please omit this from the patch


http://gerrit.cloudera.org:8080/#/c/13882/2/common/protobuf/data_stream_service.proto
File common/protobuf/data_stream_service.proto:

http://gerrit.cloudera.org:8080/#/c/13882/2/common/protobuf/data_stream_service.proto@82
PS2, Line 82: enum ImpalaInternalServiceVersionPB {
This can be removed (and the corresponding fields in the messages below).

When we first did this, the idea of versioning the internal rpcs made sense, in case we wanted to support stuff like rolling upgrades or running impalads of different versions in the same cluster.

However, we haven't ever actually implemented anything like that (and probably won't any time soon) and it wouldn't be too hard to just add versions back in if we ever did want to, so we've just been omitting this when we port stuff over to krpc.


http://gerrit.cloudera.org:8080/#/c/13882/2/common/protobuf/data_stream_service.proto@89
PS2, Line 89: required
Lets make everything optional


http://gerrit.cloudera.org:8080/#/c/13882/2/common/protobuf/data_stream_service.proto@103
PS2, Line 103: message ColumnValuePB {
Put this in common.proto


http://gerrit.cloudera.org:8080/#/c/13882/2/common/protobuf/data_stream_service.proto@127
PS2, Line 127: // UpdateFilter
Remove this.

Obviously you just copied this from ImpalaInternalService.thrift, but we don't have the same type of comment above the other messages already in this file, so its nice to be consistent (and a comment like this seems unnecessary anyways)


http://gerrit.cloudera.org:8080/#/c/13882/2/common/protobuf/data_stream_service.proto@133
PS2, Line 133: // required in V1
remove this, here and below


http://gerrit.cloudera.org:8080/#/c/13882/2/common/protobuf/data_stream_service.proto@150
PS2, Line 150:   optional int64 receiver_latency_ns = 2;
copy the comment from the equivalent field above


http://gerrit.cloudera.org:8080/#/c/13882/2/common/protobuf/data_stream_service.proto@196
PS2, Line 196:   rpc UpdateFilter(UpdateFilterParamsPB) returns (UpdateFilterResultPB);
copy over the comments that were with these in ImpalaInternalService.thrift


http://gerrit.cloudera.org:8080/#/c/13882/2/common/thrift/ImpalaInternalService.thrift
File common/thrift/ImpalaInternalService.thrift:

http://gerrit.cloudera.org:8080/#/c/13882/2/common/thrift/ImpalaInternalService.thrift@705
PS2, Line 705: service ImpalaInternalService {
             : 
             : }
This can be removed



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 2
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Fri, 19 Jul 2019 17:08:26 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 15:

Build Successful 

https://jenkins.impala.io/job/gerrit-code-review-checks/4341/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests.


-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 15
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Fri, 23 Aug 2019 04:19:53 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC

Posted by "Fang-Yu Rao (Code Review)" <ge...@cloudera.org>.
Fang-Yu Rao has uploaded a new patch set (#12). ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC
......................................................................

IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC

Port UpdateFilter() and PublishFilter() to KRPC with sidecar.

Having incorporated sidecar on the aggregation stage when
the runtime filter is a Bloom filter.

TODO:
To test this patch set using private parameterized test.

Testing:
This patch set has passed test_basic_filters and test_cache_cancellation
on my local dev box.

Change-Id: I6b394796d250286510e157ae326882bfc01d387a
---
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/rpc/thrift-server-test.cc
M be/src/runtime/backend-client.h
M be/src/runtime/client-cache.cc
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/coordinator-filter-state.h
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/decimal-value.h
M be/src/runtime/decimal-value.inline.h
M be/src/runtime/exec-env.cc
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/krpc-data-stream-recvr.cc
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/timestamp-value.h
M be/src/scheduling/request-pool-service.h
M be/src/service/client-request-state.cc
M be/src/service/client-request-state.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/frontend.h
M be/src/service/impala-internal-service.cc
M be/src/service/impala-internal-service.h
M be/src/service/impala-server.cc
M be/src/service/impala-server.h
M be/src/util/bloom-filter-test.cc
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/min-max-filter-test.cc
M be/src/util/min-max-filter.cc
M be/src/util/min-max-filter.h
M common/protobuf/common.proto
M common/protobuf/data_stream_service.proto
M common/thrift/ImpalaInternalService.thrift
40 files changed, 879 insertions(+), 710 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/82/13882/12
-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 12
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 14:

Build Successful 

https://jenkins.impala.io/job/gerrit-code-review-checks/4218/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests.


-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 14
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Mon, 12 Aug 2019 15:39:49 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 27:

Build Successful 

https://jenkins.impala.io/job/gerrit-code-review-checks/4904/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests.


-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 27
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Wed, 30 Oct 2019 04:17:03 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Thomas Tauber-Marshall (Code Review)" <ge...@cloudera.org>.
Thomas Tauber-Marshall has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 20: Code-Review+1

(4 comments)

I'll reach out to Michael and see if he wants to do a pass, otherwise I think that this can be +2ed once you address my last few comments.

Thanks again for all the work on this!

http://gerrit.cloudera.org:8080/#/c/13882/20/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

http://gerrit.cloudera.org:8080/#/c/13882/20/be/src/runtime/coordinator.cc@1113
PS20, Line 1113: DCHECK(!params->bloom_filter().always_false());
not really necessary


http://gerrit.cloudera.org:8080/#/c/13882/20/be/src/runtime/coordinator.cc@1126
PS20, Line 1126: if
I think you'll want to make this an 'else if' with the 'if' above


http://gerrit.cloudera.org:8080/#/c/13882/20/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/13882/20/be/src/runtime/runtime-filter-bank.cc@244
PS20, Line 244:       Status status = bloom_filter->Init(
There's a bug here - if we fail to get the sidecar above then bloom_filter will be nullptr, so we won't want to call Init() on it


http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/util/min-max-filter.h
File be/src/util/min-max-filter.h:

http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/util/min-max-filter.h@22
PS19, Line 22: #include "impala-ir/impala-ir-functions.h"
> Thanks for catching this! I found that we do not need 'data_stream_service'
This is getting rather nit-picky of me, but just for your understanding: the goal isn't to have the smallest number of includes that will allow things to compile, the goal is to include exactly those things that are used in the file.

Clearly, there are things from data_stream_service that are used in this file now, eg. MinMaxFilterPB. Presumably you don't need to include it for things to compile because its already included in on of the other headers thats included here.

To be clear, I'm not saying that you need to go back through this whole patch and make sure your includes are exactly correct. If it compiles as is then that's fine.

The most important things is to not include things that aren't used, since that can lead to unnecessary re-compilation in some cases.



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 20
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Wed, 18 Sep 2019 19:22:02 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Michael Ho (Code Review)" <ge...@cloudera.org>.
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 24:

(17 comments)

Sorry for the delays in reviews and thanks for the detailed replies. Please see some more comments below.

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator-filter-state.h
File be/src/runtime/coordinator-filter-state.h:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator-filter-state.h@124
PS24, Line 124: string
Please also document what this field stands for.


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1103
PS24, Line 1103:    // Workaround for fact that parameters are const& for Protobuf RPCs.
               :     BloomFilterPB* non_const_filter = &const_cast<BloomFilterPB&>(params->bloom_filter());
I don't think this makes sense anymore with the implementation using protobuf. The reason is that the old Thrift implementation embedded the entire string directory in it but in this new implementation, it's stored in the sidecar so we aren't saving much if anything by doing the swap trick below.

We can just initialize the BloomFilterPB by copying if necessary.


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1108
PS24, Line 1108: if (!bloom_filter_.has_log_bufferpool_space()) {
May be I am missing the details here but why is it not a no-op if the incoming filter is always false ?


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1110
PS24, Line 1110: bloom_filter_.Swap(non_const_filter);
See comments above.


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1132
PS24, Line 1132:  bloom_filter_.Swap(non_const_filter);
See comments above.


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1140
PS24, Line 1140: sidecar_slice.ToString(),
Seems less expensive if we can pass the slice directly. This avoids doing a copy from the slice's buffer into the string argument.


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1141
PS24, Line 1141: bloom_filter_directory_
May need to do some const casting but please consider using: 

  bloom_filter_directory_.data()


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/query-state.h
File be/src/runtime/query-state.h:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/query-state.h@202
PS24, Line 202: const PublishFilterParamsPB*
Why not keep it as const PublishFilterParamsPB& ?


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@174
PS24, Line 174: UpdateFilterParamsPB* params = obj_pool_.Add(new UpdateFilterParamsPB);
> Thank you for pointing this out! You are correct. 'params' can be freed onc
Or you can just allocate it on the stack as it isn't too large:

   UpdateFilterParamsPB params;


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@175
PS24, Line 175:     UpdateFilterResultPB* res = obj_pool_.Add(new UpdateFilterResultPB);
              :     RpcController* controller = obj_pool_.Add(new RpcController);
> Thank you very much for the suggestion!
Don't think it's safe to free the memory in the completion callback as the KRPC stack may still have reference to it.

One way to share the parameters is to ensure there is one outstanding RPC per controller object but it's probably not worth the trouble so I am fine keep them as-is.


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@196
PS24, Line 196: Couldn't send filter to coordinator: "
Substitute("Failed to get proxy to coordinator $0: $1", host_address, get_proxy_status.msg().msg());


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@240
PS24, Line 240: LOG(WARNING) << "Cannot get inbound sidecar: 
LOG(ERROR) << "Failed to get bloom filter sidecar" << ...


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@320
PS24, Line 320:   // Wait for all inflight rpcs to complete before closing the filters.
              :   {
              :     std::unique_lock<SpinLock> l1(num_inflight_rpcs_lock_);
              :     while (num_inflight_rpcs_ > 0) {
              :       krpcs_done_cv_.wait(l1);
              :     }
              :   }
              : 
              :   lock_guard<mutex> l2(runtime_filter_lock_);
              :   closed_ = true;
> That 'propagation stage' above should be 'aggregation stage' instead. Sorry
Actually, it's possible for RuntimeFilterBank::UpdateFilterFromLocal() and FragmentInstanceState::Close() to be called from different thread contexts as the existing code still has async build thread (https://github.com/apache/impala/blob/master/be/src/exec/blocking-join-node.cc#L212-L215).

However, we will always wait for that build thread to complete before returning (https://github.com/apache/impala/blob/master/be/src/exec/blocking-join-node.cc#L228-L230) so the build thread should have exited by the time we call Close() in the fragment instance thread so it could't have issued an RPC again when we reach this function.

May be worth adding a comment about it:

The async build thread should have exited by the time Close() is called so there shouldn't be any new RPCs being issued when this function is called.


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/service/data-stream-service.cc
File be/src/service/data-stream-service.cc:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/service/data-stream-service.cc@119
PS24, Line 119: req
Per comments elsewhere, we usually pass const value by reference *req.


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/service/impala-server.h
File be/src/service/impala-server.h:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/service/impala-server.h@348
PS24, Line 348: const UpdateFilterParamsPB*
Use const UpdateFilterParamsPB& params. See comments in data-stream-service.cc


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/util/bloom-filter.cc
File be/src/util/bloom-filter.cc:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/util/bloom-filter.cc@218
PS24, Line 218: const string& directory_in
Please see comments in coordinator.cc. Please consider passing in the slide directly instead.


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/util/bloom-filter.cc@219
PS24, Line 219: string*
This may be char* instead.



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 24
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Thu, 10 Oct 2019 00:30:55 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Fang-Yu Rao (Code Review)" <ge...@cloudera.org>.
Fang-Yu Rao has uploaded a new patch set (#18). ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................

IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Previously the aggregation and propagation of a runtime filter in Impala is
implemented using Thrift RPC, which suffers from a disadvantage that the number
of connections in a cluster grows with both the number of queries and cluster
size. This patch ports the functions that implement the aggregation and
propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(),
respctively, to KRPC, which requires only one connection per direction between
every pair of hosts, thus reducing the number of connections in a cluster.

In addition, this patch also incorporates KRPC sidecar when the runtime filter
is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the
Bloom filter contents when a Bloom filter is serialized to be transmitted and
hence reduces the serialization overhead. Due to the incorporation of KRPC
sidecar, a SpinLock is also added to prevent a BloomFilter from being
deallocated before its associated KRPC call finishes.

Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are
also modified accordingly because of the changes to the signatures of some
functions in BloomFilter.

Testing:
This patch has passed the exhaustive tests.

Change-Id: I6b394796d250286510e157ae326882bfc01d387a
---
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/rpc/thrift-server-test.cc
M be/src/runtime/backend-client.h
M be/src/runtime/client-cache.cc
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/coordinator-filter-state.h
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/decimal-value.h
M be/src/runtime/decimal-value.inline.h
M be/src/runtime/exec-env.cc
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/krpc-data-stream-recvr.cc
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-filter.h
M be/src/runtime/timestamp-value.h
M be/src/scheduling/request-pool-service.h
M be/src/service/client-request-state.cc
M be/src/service/client-request-state.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/frontend.h
M be/src/service/impala-internal-service.cc
M be/src/service/impala-internal-service.h
M be/src/service/impala-server.cc
M be/src/service/impala-server.h
M be/src/util/bloom-filter-test.cc
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/min-max-filter-test.cc
M be/src/util/min-max-filter.cc
M be/src/util/min-max-filter.h
M common/protobuf/common.proto
M common/protobuf/data_stream_service.proto
M common/thrift/ImpalaInternalService.thrift
41 files changed, 1,137 insertions(+), 738 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/82/13882/18
-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 18
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Fang-Yu Rao (Code Review)" <ge...@cloudera.org>.
Fang-Yu Rao has uploaded a new patch set (#24). ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................

IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Previously the aggregation and propagation of a runtime filter in Impala is
implemented using Thrift RPC, which suffers from a disadvantage that the number
of connections in a cluster grows with both the number of queries and cluster
size. This patch ports the functions that implement the aggregation and
propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(),
respctively, to KRPC, which requires only one connection per direction between
every pair of hosts, thus reducing the number of connections in a cluster.

In addition, this patch also incorporates KRPC sidecar when the runtime filter
is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the
Bloom filter contents when a Bloom filter is serialized to be transmitted and
hence reduces the serialization overhead. Due to the incorporation of KRPC
sidecar, a SpinLock is also added to prevent a BloomFilter from being
deallocated before its associated KRPC call finishes.

Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are
also modified accordingly because of the changes to the signatures of some
functions in BloomFilter.

Testing:
This patch has passed the exhaustive tests.

Change-Id: I6b394796d250286510e157ae326882bfc01d387a
---
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/runtime/backend-client.h
M be/src/runtime/client-cache.cc
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/coordinator-filter-state.h
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/decimal-value.h
M be/src/runtime/decimal-value.inline.h
M be/src/runtime/exec-env.cc
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-filter.h
M be/src/runtime/timestamp-value.h
M be/src/scheduling/request-pool-service.h
M be/src/service/client-request-state.cc
M be/src/service/client-request-state.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/frontend.h
M be/src/service/impala-internal-service.cc
M be/src/service/impala-internal-service.h
M be/src/service/impala-server.cc
M be/src/service/impala-server.h
M be/src/util/bloom-filter-test.cc
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/min-max-filter-test.cc
M be/src/util/min-max-filter.cc
M be/src/util/min-max-filter.h
M common/protobuf/common.proto
M common/protobuf/data_stream_service.proto
M common/thrift/ImpalaInternalService.thrift
39 files changed, 1,078 insertions(+), 745 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/82/13882/24
-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 24
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 31: Code-Review+2


-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 31
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Fri, 08 Nov 2019 21:32:07 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Thomas Tauber-Marshall (Code Review)" <ge...@cloudera.org>.
Thomas Tauber-Marshall has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 14:

(9 comments)

http://gerrit.cloudera.org:8080/#/c/13882/14//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/13882/14//COMMIT_MSG@14
PS14, Line 14: respctively
typo


http://gerrit.cloudera.org:8080/#/c/13882/14/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

http://gerrit.cloudera.org:8080/#/c/13882/14/be/src/runtime/coordinator.cc@1089
PS14, Line 1089:       if (params->bloom_filter().has_directory_sidecar_idx()) {
Is it actually possible that this won't be set or can we DCHECK this? (I think its possible with the way you have the code structured right now, but we might want to change that, see my other comments)

If it is possible, do we have the right behavior here? I guess what will happen is we'll proceed to updating the bloom filter with an empty directory, which could for example hit a DCHECK in BloomFilter::Or which expects the two directories to be the same size.


http://gerrit.cloudera.org:8080/#/c/13882/14/be/src/runtime/coordinator.cc@1099
PS14, Line 1099: .ToString()
This ToString() performs a copy of the data. Just call 'size()' on the slice directly.


http://gerrit.cloudera.org:8080/#/c/13882/14/be/src/runtime/coordinator.cc@1110
PS14, Line 1110:           bloom_filter_directory_ = sidecar_slice.ToString();
I think this will perform two copies - one to do ToString() and one to do the assignment.

I think we should be able to do this without any copies by using 'std::swap'. You may need to make bloom_filter_directory_ a char* to make it work


http://gerrit.cloudera.org:8080/#/c/13882/14/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/13882/14/be/src/runtime/runtime-filter-bank.cc@a103
PS14, Line 103: 
keep this comment


http://gerrit.cloudera.org:8080/#/c/13882/14/be/src/runtime/runtime-filter-bank.cc@208
PS14, Line 208: if (params->has_bloom_filter())
I think this is already guaranteed by the DCHECK above


http://gerrit.cloudera.org:8080/#/c/13882/14/be/src/util/bloom-filter.h
File be/src/util/bloom-filter.h:

http://gerrit.cloudera.org:8080/#/c/13882/14/be/src/util/bloom-filter.h@46
PS14, Line 46: using kudu::Slice;
please don't use 'using' in header files


http://gerrit.cloudera.org:8080/#/c/13882/14/be/src/util/bloom-filter.h@90
PS14, Line 90:   Bucket* GetDirectory() const { return directory_; }
Hmm, definitely preferable to keep all of this stuff private. 

I think the only reason you exposed this is for bloom-filter-test.cc/bloom-filter-benchmark.cc? If so, you should be able to just make the testing classes friends of this class

For bloom-filter-test.cc that should be easy, just 'friend class BloomFilterTest'. Its a little more complicated for bloom-filter-benchmark.cc since there's multiple classes there, but maybe you can add a single super class for them? Or else just list all three with a comment about what they are.

If you run into issues with that, another option is to have ToProtobuf take a 'Slice*' or whatever that it sets to the directory instead of take the RpcController.


http://gerrit.cloudera.org:8080/#/c/13882/14/be/src/util/bloom-filter.cc
File be/src/util/bloom-filter.cc:

http://gerrit.cloudera.org:8080/#/c/13882/14/be/src/util/bloom-filter.cc@99
PS14, Line 99:     LOG(ERROR) << "Cannot add outbound sidecar: " << sidecar_status.message().ToString();
What should we do in this case? Right now, callers of this function won't know that the error happened, so they'll go ahead and send the BloomFilterPB even though its not really valid, since it doesn't have a directory or always_true/false set.

How about in this case we 'disable' the bloom filter, i.e. set always_true = true. Then over in the coordinator we'll know that if always_false = always_true = false it must be the case that sidecar_idx is set.



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 14
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Tue, 13 Aug 2019 18:05:51 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Fang-Yu Rao (Code Review)" <ge...@cloudera.org>.
Fang-Yu Rao has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 17:

Hi Thomas and Michael, I have addressed all of your previous comments. Please review this revised patch. Thanks!


-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 17
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Mon, 26 Aug 2019 23:04:02 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 17:

Build Successful 

https://jenkins.impala.io/job/gerrit-code-review-checks/4373/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests.


-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 17
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Mon, 26 Aug 2019 23:44:52 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC

Posted by "Fang-Yu Rao (Code Review)" <ge...@cloudera.org>.
Fang-Yu Rao has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC
......................................................................


Patch Set 12:

(28 comments)

Please review the updated patch set revised according to your previous comments. Thank you very much!

http://gerrit.cloudera.org:8080/#/c/13882/11//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/13882/11//COMMIT_MSG@11
PS11, Line 11: Having incorporated sidecar on the aggregation stage when
> This sort of stuff shouldn't be included in the commit message
Thanks! I have removed this.


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/backend-client.h
File be/src/runtime/backend-client.h:

http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/backend-client.h@19
PS11, Line 19: #define IMPALA_BACKEND_CLIENT_H
> Should this file be deleted after this change ?
I recall that Thomas once mentioned that we could file a follow-up JIRA to delete unnecessary code after porting the runtime filter to KRPC since it would be easier for reviewers to review the patch set. Please let me know how I should proceed. Thanks!


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/backend-client.h@25
PS11, Line 25: #include "gen-cpp/ImpalaInternalService.h"
             : 
> try not to leave commented out code in patches you submit for review
Sure. I have removed these two lines.


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/backend-client.h@37
PS11, Line 37: 
             :   
> try not to include unrelated formatting changes in patches you submit for r
Sure. I have excluded these unrelated changes.


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/backend-client.h@52
PS11, Line 52: 
Sure. I have removed them.


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator-backend-state.cc
File be/src/runtime/coordinator-backend-state.cc:

http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator-backend-state.cc@45
PS11, Line 45: #include "util/scope-exit-trigger.h"
             : #include "util/uid-util.h"
> nit: alphabetize
Done


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator-backend-state.cc@539
PS11, Line 539: DCHECK(ProtoToQueryId(rpc_params.dst_query_id()) == query_
> Since this is only used once, you can just put the call inside the DCHECK. 
Done


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator-backend-state.cc@551
PS11, Line 551:  not a query-wide error - the remot
> This is wrong, see: https://gerrit.cloudera.org/#/c/13818/  (you'll just ne
Thanks for pointing this out! I have fixed this.


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator-backend-state.cc@555
PS11, Line 555: 
> Drop this. GetProxy() is not something that we'll probably ever retry. If y
Thanks for the explanation. I have dropped this.


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator-backend-state.cc@556
PS11, Line 556: 
> WARNING
Done


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator-backend-state.cc@566
PS11, Line 566: 
> WARNING
Done


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator-backend-state.cc@563
PS11, Line 563:   controller.Reset();
              :   }
              : }
              : 
              : Coordinator::BackendState::InstanceStats::InstanceStats(
              :     const FInstanceExecParams& exec_params, FragmentStats* fragment_stats,
              :     ObjectPool* obj_pool)
> Let's keep it like it was before an not retry. We may someday want to consi
Thanks very much for the detailed explanation! I have removed this loop to keep the code like it was before.


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator-filter-state.h
File be/src/runtime/coordinator-filter-state.h:

http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator-filter-state.h@68
PS11, Line 68: &
> I think we can avoid making this change, see my comment in coordinator.cc
Thanks for pointing this out. I have made the change accordingly.


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator.h
File be/src/runtime/coordinator.h:

http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator.h@32
PS11, Line 32: #include "gen-cpp/data_stream_service.pb.h"
> I don't think you need to import control_service here
Indeed. It has been removed.


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator.cc@926
PS11, Line 926:   lock_guard<SpinLock> l(backend_states_init_lock_);
> no need to add more whitespace
Thanks. It has been removed.


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator.cc@976
PS11, Line 976: 
> remove this
Done


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator.cc@977
PS11, Line 977:     if (filter_updates_received_->value() == 0) {
> I think we can swap this around, i.e. aggregated_filter.Swap(rpc_params.mut
Thanks for pointing this out. I have made the change accordingly.


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator.cc@992
PS11, Line 992:     if (target.is_local) continue;
              :       target_fragment_idxs.insert(target.fragment_idx);
              :     }
> This can be simplified to :
Done


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator.cc@998
PS11, Line 998:       BloomFilterPB& aggregated_filter = state->bloom_filter();
> remove this
Done


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator.cc@1008
PS11, Line 1008:     }
> The formatting was correct before, see the output of clang-format
Done


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator.cc@1040
PS11, Line 1040: rst_arrival_time_ == 0L) {
> update this
Done


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator.cc@1066
PS11, Line 1066:   
> formatting
Done


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/service/data-stream-service.cc
File be/src/service/data-stream-service.cc:

http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/service/data-stream-service.cc@136
PS11, Line 136: Substitute("Query Stat
> Is it a case in which it's legitimate for query state to be not found here 
Thanks for pointing this out. I have modified the error message accordingly.


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/service/impala-internal-service.h
File be/src/service/impala-internal-service.h:

http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/service/impala-internal-service.h@18
PS11, Line 18: #ifndef IMPALA_SERVICE_IMPALA_INTERNAL_SERVICE_H
> Shouldn't this file be deleted after this change ?
I recall that Thomas once mentioned that we could file a follow-up JIRA to delete unnecessary code after porting the runtime filter to KRPC since it would be easier for reviewers to review the patch set. Please let me know how I should proceed. Thanks!


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/service/impala-internal-service.cc
File be/src/service/impala-internal-service.cc:

http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/service/impala-internal-service.cc@36
PS11, Line 36: ImpalaInternalService::ImpalaInternalService() {
> Same question. Shouldn't this file be deleted after this change ? Also, we 
I recall that Thomas once mentioned that we could file a follow-up JIRA to delete unnecessary code after porting the runtime filter to KRPC since it would be easier for reviewers to review the patch set. Please let me know how I should proceed. Thanks!


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/service/impala-server.h
File be/src/service/impala-server.h:

http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/service/impala-server.h@342
PS11, Line 342:     const TPingImpalaHS2Servic
> Stale comment. ImpalaInternalService should not exist anymore after this ch
Thanks for pointing this out. I have removed this comment.


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/util/bloom-filter.h
File be/src/util/bloom-filter.h:

http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/util/bloom-filter.h@79
PS11, Line 79:  multiple times.
> Stale comment
Done


http://gerrit.cloudera.org:8080/#/c/13882/11/common/protobuf/common.proto
File common/protobuf/common.proto:

http://gerrit.cloudera.org:8080/#/c/13882/11/common/protobuf/common.proto@43
PS11, Line 43: This is a union over all possible return types
> nit: capitalize and period
Done



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 12
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Fri, 02 Aug 2019 05:31:54 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Fang-Yu Rao (Code Review)" <ge...@cloudera.org>.
Fang-Yu Rao has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 25:

(25 comments)

Hi Michael and Thomas, I have addressed most of Michael's comments in the previous iteration but have not uploaded the revised version since there are still four comments that I do not know exactly how to resolve. Please let me know how I should proceed. Thanks!

http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator-backend-state.cc
File be/src/runtime/coordinator-backend-state.cc:

http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator-backend-state.cc@556
PS25, Line 556:     LOG(ERROR) << "PublishFilter() rpc failed: " << rpc_status.ToString();
> return here as there is no point in continuing to print the result status i
Done


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1010
PS25, Line 1010: // >>>>>>> IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
> To be removed ?!
Done


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1022
PS25, Line 1022: std::
> nit: no need for std::
Thanks for pointing this out.

I have checked whether or not it is possible to move the declarations of 'rpc_params', 'target_fragment_idxs', and 'bloom_filter_directory' closer to their use but since these three variables are used both inside and outside the critical section below, it seems that we are not able to do that.


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1072
PS25, Line 1072:       BloomFilterPB& aggregated_filter = state->bloom_filter();
               :       aggregated_filter.Swap(rpc_params.mutable_bloom_filter());
> This swapping tricks seems unnecessary now that the bloom_filter_directory 
Thanks for the suggestion!

I have tried the following.

1. 
BloomFilterPB aggregated_filter;
aggregated_filter.CopyFrom(state->bloom_filter());
rpc_params.set_allocated_bloom_filter(&aggregated_filter);

This approach would result in the following error at runtime according to AddressSanitizer.

==26704==ERROR: AddressSanitizer: stack-use-after-scope on address 0x7fde29b39530 at pc 0x0000023b5cef bp 0x7fde29b392f0 sp 0x7fde29b392e8

2.
rpc_params.set_allocated_bloom_filter(std::move(&(state->bloom_filter())));

3.
rpc_params.set_allocated_bloom_filter(&(state->bloom_filter()));

Both the second and the third approaches would result in the following error reported by AddressSanitizer.

==19218==ERROR: AddressSanitizer: attempting free on address which was not malloc()-ed: 0x6130001a0ca0 in thread T126

I think this is because the contents we put in 'rpc_params' has to live beyond this function, i.e., Coordinator::UpdateFilter().

To address this issue, we may need to allocate a piece of memory in the heap and perform the copy operation by ourselves. But since 'data_stream_service.pb.cc' (generated by the Protobuf compiler) has already provided such a functionality, i.e., 'Swap()', I was wondering if it would be easier to just use this provided function.


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1074
PS25, Line 1074: bloom_filter_directory
> Is there a reason to assign state->bloom_filter_directory() to this local v
Thanks for the suggestion.

According to the current implementation, the variable 'state' is defined according to the variable 'it', both of which are defined inside the critical section. To address your suggestion, we may need to move the following two statements out of the critical section.

1. auto it = filter_routing_table_->id_to_filter.find(params.filter_id()), and
2. FilterState* state = &it->second.

However, it seems this will also move the use of the variable 'it' away from its definition.

Please let me know how you would like to proceed. Thanks!


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1098
PS25, Line 1098:  reinterpret_cast<const char*>(&(bloom_filter_directory[0])
> state->bloom_filter_directory().data()
Since 'state' is defined inside of the critical section as mentioned above, we might not be able to replace this argument with 'state->bloom_filter_directory().data()'.


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1153
PS25, Line 1153: bloom_filter_ = BloomFilterPB(params.bloom_filter());
> bloom_filter_ = params.bloom_filter();
Done


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1155
PS25, Line 1155: std::string(
               :               reinterpret_cast<const char*>(sidecar_slice.data()), sidecar_slice.size())
> Can keep the original suggestion of sidecar_slice.ToString(). C++ ROV shoul
Thanks for pointing this out. I have changed this accordingly.


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/runtime-filter-bank.cc@190
PS25, Line 190: std::
> nit: no need for std::
Done


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/runtime-filter-bank.cc@204
PS25, Line 204: std::
> nit: no need for std::
Done


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/runtime-filter-bank.cc@205
PS25, Line 205:       ++num_inflight_rpcs_;
> DCHECK_GE(num_inflight_rpcs_, 0); before increment.
Done


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/runtime-filter-bank.cc@323
PS25, Line 323: std::
> nit: no need for std::
Done


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/timestamp-value.h
File be/src/runtime/timestamp-value.h:

http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/timestamp-value.h@170
PS25, Line 170:   // Store the binary representation of this TimestampValue in 'tvalue'.
              :   void ToTColumnValue(TColumnValue* tvalue) const {
              :     const uint8_t* data = reinterpret_cast<const uint8_t*>(this);
              :     tvalue->timestamp_val.assign(data, data + Size());
              :     tvalue->__isset.timestamp_val = true;
              :   }
> Is this not used anymore ?
Thanks for pointing this out! Indeed, it is not used anymore. I have removed it.


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/timestamp-value.h@183
PS25, Line 183:   // Returns a new TimestampValue created from the value in 'tvalue'.
              :   static TimestampValue FromTColumnValue(const TColumnValue& tvalue) {
              :     TimestampValue value;
              :     memcpy(&value, tvalue.timestamp_val.c_str(), Size());
              :     value.Validate();
              :     return value;
              :   }
> Is this not used anymore ?
Thanks! I have removed it.


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/service/impala-server.h
File be/src/service/impala-server.h:

http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/service/impala-server.h@35
PS25, Line 35: #include "gen-cpp/ImpalaInternalService.h"
> This can be removed.
Thanks for this suggestion.

After trying to remove this header file, I got the following 2 error messages.

1.
In file included from be/src/rpc/thrift-server-test.cc:24:
be/src/rpc/thrift-client.h:134:18: error: allocation of incomplete type 'impala::ImpalaInternalServiceClient'
      iface_(new InterfaceType(protocol_)),
                 ^~~~~~~~~~~~~
be/src/rpc/thrift-server-test.cc:526:30: note: in instantiation of member function 'impala::ThriftClient<impala::ImpalaInternalServiceClient>::ThriftClient' requested here
        Client* client = new Client("127.0.0.1", port, "", nullptr, false);
                             ^
be/src/runtime/client-cache-types.h:32:7: note: forward declaration of 'impala::ImpalaInternalServiceClient'
class ImpalaInternalServiceClient;
      ^

2.
In file included from be/src/rpc/thrift-server-test.cc:24:
be/src/rpc/thrift-client.h:162:20: error: allocation of incomplete type 'impala::ImpalaInternalServiceClient'
  iface_.reset(new InterfaceType(protocol_));
                   ^~~~~~~~~~~~~
be/src/runtime/client-cache-types.h:32:7: note: forward declaration of 'impala::ImpalaInternalServiceClient'
class ImpalaInternalServiceClient;
      ^
According to the errors, it seems that in 'thrift-server-test.cc', 'ImpalaInternalServiceClient' is used in the following statement.

using Client = ThriftClient<ImpalaInternalServiceClient>; (https://github.com/apache/impala/blob/master/be/src/rpc/thrift-server-test.cc#L525)

Recall that 'ImpalaInternalServiceClient' is defined in 'ImpalaInternalService.h' so that I guess it is the reason why we have these errors.

If this is the case, then I think we need to at least remove 'thrift-server-test.cc' as well. Therefore my question here is whether or not we could remove this BE test here, or we do it in a follow-up JIRA. Thanks!


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/service/impala-server.h@70
PS25, Line 70: class TReportExecStatusArgs;
             : class TReportExecStatusResult;
> Not your change but these can be removed.
Thanks for pointing this out!

I also found that three classes above are not used as well. I guess we could also remove them, right?

1. class TCatalogUpdate;
2. class TPlanExecRequest;
3. class TPlanExecParams;


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter-test.cc
File be/src/util/bloom-filter-test.cc:

http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter-test.cc@75
PS25, Line 75: std::shared_ptr<RpcController
> Why shared_ptr ?
Thanks for pointing this out!

It does not have to be a shared_ptr. I have changed 'controller_x', 'controller_y' to objects of RpcController's. 

The types of 'controller_x2', and 'controller_y2' in the following are also changed accordingly.


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter-test.cc@352
PS25, Line 352: std::shared_ptr
> Why shared_ptr ?
Thanks for pointing this out.

I have changed 'controller' to an object of RpcController instead like above.


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter.cc
File be/src/util/bloom-filter.cc:

http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter.cc@125
PS25, Line 125: DCHECK_EQ(protobuf->always_false(), false);
> DCHECK(!protobuf->always_false());
Done


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter.cc@236
PS25, Line 236: &(directory_in[0])
> directory_in
Done


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter.cc@237
PS25, Line 237: &(directory_out[0])
> directory_out
Done


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter.cc@239
PS25, Line 239: &(directory_in[0])
> directory_in
Done


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter.cc@241
PS25, Line 241: &(directory_in[0]
> directory_in
Done


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter.cc@242
PS25, Line 242: &(directory_out[0])
> directory_out
Done


http://gerrit.cloudera.org:8080/#/c/13882/25/common/protobuf/common.proto
File common/protobuf/common.proto:

http://gerrit.cloudera.org:8080/#/c/13882/25/common/protobuf/common.proto@43
PS25, Line 43: // This is a union over all possible return types.
> If we upgrade to proto3 above, we can use the oneof feature in protobuf3 if
Thanks very much for the suggestion. I have added a TODO here accordingly.



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 25
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Mon, 28 Oct 2019 15:39:30 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC

Posted by "Fang-Yu Rao (Code Review)" <ge...@cloudera.org>.
Fang-Yu Rao has uploaded a new patch set (#5). ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC
......................................................................

IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC

Port UpdateFilter() and PublishFilter() to KRPC w/o sidecar.

Testing:
1. Have tested this patch set using those EE cases in test_runtime_filters.py
on my local dev box.
2. Have tested this patch set using those BE tests in bloom-filter-test.cc
on my local dev box.

TODO:
Incorporate sidecar.

Change-Id: I6b394796d250286510e157ae326882bfc01d387a
---
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/rpc/thrift-server-test.cc
M be/src/runtime/backend-client.h
M be/src/runtime/client-cache.cc
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/coordinator-filter-state.h
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/decimal-value.h
M be/src/runtime/decimal-value.inline.h
M be/src/runtime/exec-env.cc
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/timestamp-value.h
M be/src/scheduling/request-pool-service.h
M be/src/service/client-request-state.cc
M be/src/service/client-request-state.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/frontend.h
M be/src/service/impala-internal-service.cc
M be/src/service/impala-internal-service.h
M be/src/service/impala-server.cc
M be/src/service/impala-server.h
M be/src/util/bloom-filter-test.cc
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/min-max-filter-test.cc
M be/src/util/min-max-filter.cc
M be/src/util/min-max-filter.h
M common/protobuf/control_service.proto
M common/protobuf/data_stream_service.proto
M common/thrift/ImpalaInternalService.thrift
39 files changed, 776 insertions(+), 690 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/82/13882/5
-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 5
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 31:

Build started: https://jenkins.impala.io/job/gerrit-verify-dryrun/5199/ DRY_RUN=false


-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 31
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Fri, 08 Nov 2019 21:32:08 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Fang-Yu Rao (Code Review)" <ge...@cloudera.org>.
Fang-Yu Rao has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 24:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@320
PS24, Line 320:   // Wait for all inflight rpcs to complete before closing the filters.
              :   {
              :     std::unique_lock<SpinLock> l1(num_inflight_rpcs_lock_);
              :     while (num_inflight_rpcs_ > 0) {
              :       krpcs_done_cv_.wait(l1);
              :     }
              :   }
              : 
              :   lock_guard<mutex> l2(runtime_filter_lock_);
              :   closed_ = true;
> Thanks very much for the comment.
That 'propagation stage' above should be 'aggregation stage' instead. Sorry for any confusion caused.



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 24
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Sun, 29 Sep 2019 18:25:59 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 24:

Build Successful 

https://jenkins.impala.io/job/gerrit-code-review-checks/4633/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests.


-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 24
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Tue, 24 Sep 2019 23:03:46 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 27:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/runtime/timestamp-value.h
File be/src/runtime/timestamp-value.h:

http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/runtime/timestamp-value.h@175
PS27, Line 175:   
line has trailing whitespace



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 27
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Wed, 30 Oct 2019 03:31:59 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC
......................................................................


Patch Set 12:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/13882/12/be/src/util/bloom-filter.h
File be/src/util/bloom-filter.h:

http://gerrit.cloudera.org:8080/#/c/13882/12/be/src/util/bloom-filter.h@87
PS12, Line 87:   /// Converts 'filter' to its corresponding Protobuf representation. If the first argument
line too long (91 > 90)



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 12
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Fri, 02 Aug 2019 05:29:54 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 30: Code-Review+2


-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 30
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Fri, 08 Nov 2019 00:12:41 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Fang-Yu Rao (Code Review)" <ge...@cloudera.org>.
Fang-Yu Rao has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 18:

(16 comments)

Hi Thomas and Michael, I have addressed all the previous comments. Please review my revised patch. Thank you very much!

http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.h
File be/src/runtime/coordinator.h:

http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.h@48
PS17, Line 48: namespace impala {
> don't use 'using' in header files
Done


http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.cc@986
PS17, Line 986: 
              : 
              :   exe
> You can leave the formatting as-is
Done


http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.cc@1041
PS17, Line 1041: 
> Its unfortunate that this logic is duplicated between here and BloomFilter:
Thanks very much for the detailed explanation! 

As we have discussed, we will adopt the first option since it requires much fewer modifications to the current code. Otherwise, other than 'BloomFilter::Or()' and those places that make use of that 'BloomFilterPB' and its respective directory stored by the class 'Coordinator::FilterState', we also have to make the corresponding changes to 'bloom-filter-test.cc' and 'bloom-filter-benchmark.cc', which might take much more time to implement.


http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.cc@1089
PS17, Line 1089: reinterpret_cast<const char*>(&((bloom_filter_directory)[0])),
               :           static_cast<unsigned long>(bloom_filter
> Might be cleaner to check has_directory_sidecar_idx() here, which should be
Done


http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.cc@1096
PS17, Line 1096:     Coordinator* coord, RpcContext* context) {
> You need to handle this error, i.e. call Disable()
Done


http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.h
File be/src/runtime/runtime-filter-bank.h:

http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.h@52
PS17, Line 52: /// RuntimeFilters are produced and consumed by plan nodes at run time to propagate
> I don't think this is used anywhere
Thanks for pointing this out! I have removed it.


http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.h@142
PS17, Line 142: 
> I think this can be private.
Thanks for pointing this out. I have modified the name and made it private.


http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.h@154
PS17, Line 154: boost::uno
> num_inflight_rpcs_
Done


http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc@133
PS17, Line 133:   DCHECK_NE(state_->query_options().runtime_filter_mode, TRuntimeFilterMode::OFF)
> While thinking about the locking protocol in Close(), I noticed that we don
Thanks very much for the detailed explanation! I have double-checked what you have described above and I think your argument is correct. 

I have also added an additional 'DCHECK(!closed_);' with an additional brief comment as suggested.

On the other hand, I also found that RuntimeState::ReleaseResources() will also be called by Java_org_apache_impala_service_FeSupport_NativeEvalExprsWithoutRow() in fe-support.cc. This function will do the work for the frontend to rewrite a given batch of const expressions according to the comments of this function. So I guess in this case we do not have to worry about RuntimeState::ReleaseResources() being called before RuntimeFilterBank::UpdateFilterFromLocal() since RuntimeFilterBank::UpdateFilterFromLocal() will not be called is such queries.


http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc@187
PS17, Line 187: // Use 'proxy' to send the filter to the coordinator.
              :     std::unique_ptr<DataStreamServiceProxy> proxy;
              :     Status get_proxy_status =
              :      
> This needs to be moved down after the check for get_proxy_status.ok()
Thanks for pointing this out! I have moved this down after the check for get_proxy_status.ok().


http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc@199
PS17, Line 199:     // Use 'num_inflight_rpcs_' to keep track of the number of current in-flight
> line too long (91 > 90)
Done


http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc@238
PS17, Line 238:         kudu::Status status =
> I don't think that we can just return here, we need to actually handle the 
Thanks for pointing this out. I have set 'bloom_filter' to 'BloomFilter::ALWAYS_TRUE_FILTER' if 'status.ok()' is false.


http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc@243
PS17, Line 243: bloom_filter = BloomFilt
> This is doing an unnecessary copy, to Slice::ToString() copies all of the d
Thanks for the suggestion. I have added a function 'BloomFilter::Init()' that takes a 'const uint8_t*' and a 'size_t' provided by 'sidecar_slice.data()' and 'sidecar_slice.size()', respectively.


http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc@314
PS17, Line 314:     BloomFilter::FalsePositiveProb(observed_ndv, BitUtil::Log2Ceiling64(filter_size));
              :   return fpp > FLAGS_max_filter_error_rate;
              : }
> This comment is a little weird, maybe rephrase it more like "Wait for all i
Thank you for the suggestion. I have rephrased this comment as suggested.


http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc@317
PS17, Line 317: 
              : void RuntimeFilterBank::Close() {
              :   // Wait for all inflight rpcs to complete before closing the filters.
              :   {
> I don't think it technically matters here, but always better not to hold tw
Thanks for the explanation! I have put this in its own {} block.


http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc@324
PS17, Line 324:     }
> unnecessary extra whitespace
Done



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 18
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Mon, 09 Sep 2019 15:12:11 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Thomas Tauber-Marshall (Code Review)" <ge...@cloudera.org>.
Thomas Tauber-Marshall has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 27:

(3 comments)

http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/util/min-max-filter.cc
File be/src/util/min-max-filter.cc:

http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/util/min-max-filter.cc@a696
PS27, Line 696: 
Don't remove this comment


http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/util/min-max-filter.cc@a701
PS27, Line 701: 
Don't remove this comment


http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/util/min-max-filter.cc@a704
PS27, Line 704: 
Don't remove this. I'm surprised this isn't causing any test failures, since we should be hitting the DCHECK below now.



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 27
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Tue, 05 Nov 2019 01:20:37 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 17:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc@199
PS17, Line 199:       LOG(INFO) << "Couldn't send filter to coordinator: " << get_proxy_status.msg().msg();
line too long (91 > 90)



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 17
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Mon, 26 Aug 2019 23:03:40 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Michael Ho (Code Review)" <ge...@cloudera.org>.
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 25:

(26 comments)

http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator-backend-state.cc
File be/src/runtime/coordinator-backend-state.cc:

http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator-backend-state.cc@556
PS25, Line 556:     LOG(ERROR) << "PublishFilter() rpc failed: " << rpc_status.ToString();
return here as there is no point in continuing to print the result status if RPC failed.


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1134
PS24, Line 1134: r is neither an always true filter nor an
               :       // always false filter, then it must be the case that a non-empty sidecar slice
> I actually found out that if we are using "std::move(sidecar_slice.ToString
Sorry for the wrong advice. I believe the RVO should take care of the copy-elision so no need for the std::move. In other words, it should be sufficient to just call sidecar_slice.ToString();


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1010
PS25, Line 1010: // >>>>>>> IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
To be removed ?!


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1022
PS25, Line 1022: std::
nit: no need for std::

Also, may make sense to move this declaration to somewhere closer to its use (e.g. line 1068 below).

Same for rpc_params and target_fragment_idxs above.


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1072
PS25, Line 1072:       BloomFilterPB& aggregated_filter = state->bloom_filter();
               :       aggregated_filter.Swap(rpc_params.mutable_bloom_filter());
This swapping tricks seems unnecessary now that the bloom_filter_directory is a separate structure.


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1074
PS25, Line 1074: bloom_filter_directory
Is there a reason to assign state->bloom_filter_directory() to this local variable ? Why cannot we use state->bloom_filter_directory() directly below ?


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1098
PS25, Line 1098:  reinterpret_cast<const char*>(&(bloom_filter_directory[0])
state->bloom_filter_directory().data()


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1153
PS25, Line 1153: bloom_filter_ = BloomFilterPB(params.bloom_filter());
bloom_filter_ = params.bloom_filter();

The above should be sufficient.


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1155
PS25, Line 1155: std::string(
               :               reinterpret_cast<const char*>(sidecar_slice.data()), sidecar_slice.size())
Can keep the original suggestion of sidecar_slice.ToString(). C++ ROV should take care of avoiding the copy so the TODO can be removed. There is no easy way to avoid copying the sidecar from the network buffer so we have to copy at least once.


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/runtime-filter-bank.cc@190
PS25, Line 190: std::
nit: no need for std::


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/runtime-filter-bank.cc@204
PS25, Line 204: std::
nit: no need for std::


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/runtime-filter-bank.cc@205
PS25, Line 205:       ++num_inflight_rpcs_;
DCHECK_GE(num_inflight_rpcs_, 0); before increment.


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/runtime-filter-bank.cc@323
PS25, Line 323: std::
nit: no need for std::


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/timestamp-value.h
File be/src/runtime/timestamp-value.h:

http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/timestamp-value.h@170
PS25, Line 170:   // Store the binary representation of this TimestampValue in 'tvalue'.
              :   void ToTColumnValue(TColumnValue* tvalue) const {
              :     const uint8_t* data = reinterpret_cast<const uint8_t*>(this);
              :     tvalue->timestamp_val.assign(data, data + Size());
              :     tvalue->__isset.timestamp_val = true;
              :   }
Is this not used anymore ?


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/timestamp-value.h@183
PS25, Line 183:   // Returns a new TimestampValue created from the value in 'tvalue'.
              :   static TimestampValue FromTColumnValue(const TColumnValue& tvalue) {
              :     TimestampValue value;
              :     memcpy(&value, tvalue.timestamp_val.c_str(), Size());
              :     value.Validate();
              :     return value;
              :   }
Is this not used anymore ?


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/service/impala-server.h
File be/src/service/impala-server.h:

http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/service/impala-server.h@35
PS25, Line 35: #include "gen-cpp/ImpalaInternalService.h"
This can be removed.


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/service/impala-server.h@70
PS25, Line 70: class TReportExecStatusArgs;
             : class TReportExecStatusResult;
Not your change but these can be removed.


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter-test.cc
File be/src/util/bloom-filter-test.cc:

http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter-test.cc@75
PS25, Line 75: std::shared_ptr<RpcController
Why shared_ptr ?


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter-test.cc@352
PS25, Line 352: std::shared_ptr
Why shared_ptr ?


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter.cc
File be/src/util/bloom-filter.cc:

http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter.cc@125
PS25, Line 125: DCHECK_EQ(protobuf->always_false(), false);
DCHECK(!protobuf->always_false());


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter.cc@236
PS25, Line 236: &(directory_in[0])
directory_in


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter.cc@237
PS25, Line 237: &(directory_out[0])
directory_out


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter.cc@239
PS25, Line 239: &(directory_in[0])
directory_in


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter.cc@241
PS25, Line 241: &(directory_in[0]
directory_in


http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/util/bloom-filter.cc@242
PS25, Line 242: &(directory_out[0])
directory_out


http://gerrit.cloudera.org:8080/#/c/13882/25/common/protobuf/common.proto
File common/protobuf/common.proto:

http://gerrit.cloudera.org:8080/#/c/13882/25/common/protobuf/common.proto@43
PS25, Line 43: // This is a union over all possible return types.
If we upgrade to proto3 above, we can use the oneof feature in protobuf3 if only one of the fields below is set at a time. This saves some memory. May be worth considering but could be a TODO.

https://developers.google.com/protocol-buffers/docs/proto3#oneof



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 25
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Thu, 24 Oct 2019 01:35:21 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Thomas Tauber-Marshall (Code Review)" <ge...@cloudera.org>.
Thomas Tauber-Marshall has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 29:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/13882/29/be/src/util/bloom-filter.h
File be/src/util/bloom-filter.h:

http://gerrit.cloudera.org:8080/#/c/13882/29/be/src/util/bloom-filter.h@162
PS29, Line 162: static void AddDirectorySidecar(BloomFilterPB* rpc_params,
              :       kudu::rpc::RpcController* controller, const char* directory,
              :       unsigned long directory_size);
              :   static void AddDirectorySidecar(BloomFilterPB* rpc_params,
              :       kudu::rpc::RpcController* controller, const string& directory);
I don't think its necessary to have two versions of this function. You should be able to eliminate the 'const char*' one and just have the 'const string& one'



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 29
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Thu, 07 Nov 2019 20:10:25 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Thomas Tauber-Marshall (Code Review)" <ge...@cloudera.org>.
Thomas Tauber-Marshall has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 18:

(31 comments)

Thanks so much for all of your work on this! Its getting pretty close, most of these comments are cosmetic

One thing: instead of me going through and marking any formatting issues, please run clang-format and take its suggestions. Let me know if you need help with this. Keep in mind that clang-format is just a suggestion and you should:
- keep formatting consistent with the rest of the surrounding file or with things that you know are impala's guidelines, even if clang-format disagrees
- avoid  changing lines that otherwise wouldn't be affected by your patch just to fix the formatting, eg. you don't need to re-arrange includes that were already present even if clang-format says to

http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/coordinator-backend-state.h
File be/src/runtime/coordinator-backend-state.h:

http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/coordinator-backend-state.h@42
PS18, Line 42: namespace kudu {
             : namespace rpc {
             : class RpcController;
             : } // namespace rpc
             : } // namespace kudu
Is this needed? rpc_controller.h is being imported above already


http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/coordinator-backend-state.cc
File be/src/runtime/coordinator-backend-state.cc:

http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/coordinator-backend-state.cc@551
PS18, Line 551: DCHECK
DCHECK_EQ?


http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/coordinator-backend-state.cc@570
PS18, Line 570: kudu::Status rpc_status;
              :   rpc_status = proxy->PublishFilter(rpc_params, &res, &controller);
This can all be one line


http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/coordinator-backend-state.cc@572
PS18, Line 572:   if (!rpc_status.ok()) {
In addition to rpc_status, which just reflects whether the rpc actually got to the other machine and was responded to, there's also PublishFilterResultPB::status, which reflects whether the operation itself was successful, so we should add another 'if' that checks that and logs if its an error.


http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/coordinator.cc@994
PS18, Line 994: FilterState* state;
I don't think there's any reason to move this here


http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/coordinator.cc@1013
PS18, Line 1013:     // Check if the filter has already been sent, which could happen in four cases:
I think we need another case in this comment for 'failed to get bloom filter sidecar'


http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/coordinator.cc@1070
PS18, Line 1070:       bs->PublishFilter(rpc_params, controller);
Could you add a TODO: make this asynchronous


http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/coordinator.cc@1083
PS18, Line 1083: void Coordinator::SetupSidecarForBloomFilter(PublishFilterParamsPB* rpc_params,
This function is unnecessary, you can just call AddSidecar directly


http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/coordinator.cc@1089
PS18, Line 1089: (
extra parentheses


http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/coordinator.cc@1115
PS18, Line 1115: // If the incoming Bloom filter is neither an always true filter nor an
               :       // always false filter, then it must be the case that a non-empty sidecar slice
               :       // has been received. Refer to BloomFilter::ToProtobuf() for further details.
Comment is outdated.

Thinking through the cases here, I think that this all works correctly, but its a little weird.

In particular, in the case where params->bloom_filter() is always_false:
- if bloom_filter_ is also always_false, then we'll TryConsume(0), do a swap() that doesn't actually change the value of bloom_filter_ since its still always_false, and then re-instantiate bloom_filter_directory_ to an empty string
- if bloom_filter_ is not always_false, then we'll call Or(), which will just return immediately without doing anything

This is how it worked before, so it would be fine to leave it, but I think that we can make this clearer by changing the 'else' on line 1109 to be 'else (!params->bloom_filter().always_false)' and skip all the extra work in that case. Then we can make the 'if' here a DCHECK(params->bloom_filter().has_directory_sidecar_idx()), and people won't have to reason through what the code below would do in the case that sidecar_slice doesn't get filled in.

You'll of course also want to add appropriate comments (and you should think it through and make sure my reasoning is correct)


http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/runtime-filter-bank.h
File be/src/runtime/runtime-filter-bank.h:

http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/runtime-filter-bank.h@147
PS18, Line 147:   /// Lock protecting those BloomFilter's in 'bloom_filters_' from being
Lets move this somewhere further down, so that runtime_filter_lock_ is next to the things it protects. Also rename 'num_krpcs_counter_lock_' to 'num_inflight_rpcs_lock_', change the comment above it to just say that it protects 'num_inflight_rpcs_' and also that it shouldn't be taken at the same time as 'runtime_filter_lock_'.

Then put a comment above num_inflight_rpcs_ explaining what its used for (as mentioned elsewhere, you can just use your existing comment from the cc file)


http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/runtime-filter-bank.h@194
PS18, Line 194:   void UpdateFilterCompleteCb(const kudu::rpc::RpcController* rpc_controller);
brief comment


http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/runtime-filter-bank.cc@41
PS18, Line 41: #include "service/control-service.h"
I don't think you need this (or probably a couple of the other imports you added here)


http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/runtime-filter-bank.cc@112
PS18, Line 112: void RuntimeFilterBank::UpdateFilterCompleteCb(
I think we'll also want to pass in the UpdateFilterResultPB* here by inclusing it in the bind() below, so that we can check it's 'status'. I think in the current code in data-stream-service that it can only be OK, so its fine to DCHECK on it.


http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/runtime-filter-bank.cc@190
PS18, Line 190: krpc_address, krpc_address.hostname
This is wrong. See https://gerrit.cloudera.org/#/c/13818/


http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/runtime-filter-bank.cc@199
PS18, Line 199:  // Use 'num_inflight_rpcs_' to keep track of the number of current in-flight
              :     // KRPC calls to prevent the memory pointed to by 'bloom_filter' being
              :     // deallocated in RuntimeFilterBank::Close() before all KRPC calls have
              :     // been completed.
Lets move this to above the definition of num_inflight_rpcs_ in the header


http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/runtime-filter-bank.cc@237
PS18, Line 237:       if (params->bloom_filter().has_directory_sidecar_idx()) {
Maybe add en else{} that just DCHECKs that params->bloom_filter().always_false is true, since that's the only reason this shouldn't be set.


http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/runtime-filter-bank.cc@242
PS18, Line 242: LOG(ERROR)
Probably WARNING, since we're not considering this a reason to fail the query


http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/util/bloom-filter-test.cc
File be/src/util/bloom-filter-test.cc:

http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/util/bloom-filter-test.cc@368
PS18, Line 368: (NULL
nullptr


http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/util/bloom-filter.h
File be/src/util/bloom-filter.h:

http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/util/bloom-filter.h@30
PS18, Line 30: #include "gen-cpp/control_service.pb.h"
I don't think that you need this


http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/util/bloom-filter.h@38
PS18, Line 38: class Slice;
This isn't used


http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/util/bloom-filter.h@95
PS18, Line 95: Status Init(const BloomFilterPB& protobuf, const std::string& directory);
We can eliminate this version, since its basically exactly the same as the one below. Just have the code that calls this instead call .data() and .size() on the string when passing it in.


http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/util/bloom-filter.h@106
PS18, Line 106: /// Also uses a sidecar buffer to instantiate an outbound sidecar, which in turn
              :   /// is passed to 'controller' to get a sidecar index which will be used on the client
              :   /// side to retrieve the contents stored in the sidecar buffer.
I think this can be simplified to something more like "Also sets a sidecar on 'controller' containing the bloom filter's directory"


http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/util/bloom-filter.h@125
PS18, Line 125: /// 'directory_in' is the string associated with the BloomFilterPB 'in' and
              :   /// 'directory_out' is the string associated with the BloomFilterPB 'out'.
This can be eliminated, its fairly obvious from the names


http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/util/bloom-filter.h@223
PS18, Line 223: /// If the first argument is NULL, it is interpreted as a complete filter which
              :   /// contains all elements.
              :   /// It also uses a sidecar buffer to instantiate an outbound sidecar, which in turn
              :   /// is passed to 'controller' to get a sidecar index which will be used on the
              :   /// receiving side to retrieve the contents stored in the sidecar buffer.
This can be eliminated (its covered by the comment for the public ToProtobuf above)


http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/util/bloom-filter.h@229
PS18, Line 229: void AddSidecar(BloomFilterPB* protobuf,
              :       kudu::rpc::RpcController* controller) const;
I don't think this is actually used anywhere


http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/util/bloom-filter.h@231
PS18, Line 231: AddSidecar
It might be more clear to name this something like AddDirectorySidecar(), and also add a brief comment saying what it does, including noting any requirements this function has (eg. it looks like you require that the filter is neither always_true nor always_false)

And of course, when you remove the 'friend' below it'll need to be made public


http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/util/bloom-filter.h@247
PS18, Line 247: friend class Coordinator;
Remove this.

As a general rule, we avoid using 'friend' because it messes up encapsulation. The main exception is for tests, where encapsulation doesn't matter as much.


http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/util/bloom-filter.cc
File be/src/util/bloom-filter.cc:

http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/util/bloom-filter.cc@92
PS18, Line 92:     unsigned long directory_size) {
Look like this function requires that the filter is not always_false. You should add a DCHECK for that, and also for any other requirements, eg. that protobuf != nullptr


http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/util/bloom-filter.cc@249
PS18, Line 249: (
I think you got an unnecessary pair of parentheses here and around directory_out on this line, and also in several places on the lines below. Makes it a better hard to read what's going on


http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/util/bloom-filter.cc@257
PS18, Line 257:     __m128i* simd_out = reinterpret_cast<__m128i*>(&((*(directory_out))[0]));
Might as well leave this where it was, to keep the comment next to the loop its referring to



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 18
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Mon, 09 Sep 2019 19:15:22 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC
......................................................................


Patch Set 7:

Build Successful 

https://jenkins.impala.io/job/gerrit-code-review-checks/3936/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests.


-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 7
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Sun, 21 Jul 2019 02:44:47 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Fang-Yu Rao (Code Review)" <ge...@cloudera.org>.
Fang-Yu Rao has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 19:

(1 comment)

There is an additional note added to the comments on Line 1117. Thanks!

http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/coordinator.cc@1115
PS18, Line 1115: if (!status.ok()) {
               :         LOG(ERROR) << "Cannot get inbound sidecar: " << status.message().ToString();
               :         Disable(coord->filter_mem_tracker_);
> Thanks very much for the detailed explanation!
I think the observation above actually indicates that even though the Bloom filter fed to RuntimeFilterBank::PublishGlobalFilter() corresponds to an always false filter, our current implementation will still call BloomFilter::Init() to allocate the memory space, which seems like a redundant and wasteful operation because the allocated Bloom filter is basically a bit array of all 0's. Maybe we could also simplify the current logic in a follow-up JIRA.



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 19
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Thu, 12 Sep 2019 22:33:52 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC

Posted by "Thomas Tauber-Marshall (Code Review)" <ge...@cloudera.org>.
Thomas Tauber-Marshall has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC
......................................................................


Patch Set 11:

(22 comments)

http://gerrit.cloudera.org:8080/#/c/13882/11//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/13882/11//COMMIT_MSG@11
PS11, Line 11: Misc note: fixed the memory leak in data-stream-service.cc.
This sort of stuff shouldn't be included in the commit message


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/backend-client.h
File be/src/runtime/backend-client.h:

http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/backend-client.h@25
PS11, Line 25: //#include "gen-cpp/control_service.pb.h"
             : //#include "gen-cpp/data_stream_service.pb.h"
try not to leave commented out code in patches you submit for review


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/backend-client.h@37
PS11, Line 37: 
             :   
try not to include unrelated formatting changes in patches you submit for review


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/backend-client.h@47
PS11, Line 47: /// We intentionally disable this clang warning as we intend to hide the
             : /// the same-named functions defined in the base class.
             : #pragma clang diagnostic push
             : #pragma clang diagnostic ignored "-Woverloaded-virtual"
             : 
             : #pragma clang diagnostic pop
this can all be removed since you removed the code that was causing the clang warning


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator-backend-state.cc
File be/src/runtime/coordinator-backend-state.cc:

http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator-backend-state.cc@45
PS11, Line 45: #include "gen-cpp/data_stream_service.proxy.h"
             : #include "service/data-stream-service.h"
nit: alphabetize


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator-backend-state.cc@539
PS11, Line 539: TUniqueId qid = ProtoToQueryId(rpc_params.dst_query_id());
Since this is only used once, you can just put the call inside the DCHECK. That will also save the cost of doing the conversion on release builds when the DCHECK is elided.


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator-backend-state.cc@551
PS11, Line 551: krpc_address, krpc_address.hostname
This is wrong, see: https://gerrit.cloudera.org/#/c/13818/  (you'll just need to do the same thing as the other calls to GetProxy in this class)


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator-backend-state.cc@555
PS11, Line 555: // TODO: Retry.
Drop this. GetProxy() is not something that we'll probably ever retry. If you go look through the code for it, I think the only way it can fail is if the network address doesn't parse correctly, which is not something that will be fixed by retrying.


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator-backend-state.cc@556
PS11, Line 556: LOG(INFO)
WARNING


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator-backend-state.cc@566
PS11, Line 566: LOG(INFO)
WARNING


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator-backend-state.cc@563
PS11, Line 563: do {
              :     rpc_status = proxy->PublishFilter(rpc_params, &res, &controller);
              :     if (!rpc_status.ok()) {
              :       LOG(INFO) << "PublishFilterPB() failed: " << rpc_status.message().ToString();
              :       controller.Reset();
              :     }
              :   } while (!rpc_status.ok());
Let's keep it like it was before an not retry. We may someday want to consider retrying, but its best to do things incrementally and keep this patch focused just on the refactor.

That allows us to think stuff like this through more when we actually do it, eg. you wouldn't want to retry like this because if it never succeeds, say because the impalad you're trying to send to went down, you're stuck in an infinite loop. But then that starts opening up questions like how many times should we retry, how long should we wait between retries, should we retry regardless of what the error is or are some errors clearly non-recoverable and we can stop, etc.


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator-filter-state.h
File be/src/runtime/coordinator-filter-state.h:

http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator-filter-state.h@68
PS11, Line 68: *
I think we can avoid making this change, see my comment in coordinator.cc


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator.h
File be/src/runtime/coordinator.h:

http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator.h@32
PS11, Line 32: #include "gen-cpp/control_service.pb.h"
I don't think you need to import control_service here


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator.cc@926
PS11, Line 926: 
no need to add more whitespace


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator.cc@976
PS11, Line 976: // To check whether or not the following usage of Swap function is correct
remove this


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator.cc@977
PS11, Line 977:       rpc_params.mutable_bloom_filter()->Swap(aggregated_filter);
I think we can swap this around, i.e. aggregated_filter.Swap(rpc_params.mutable_bloom_filter()) to avoid the change in coordinator-filter-state.h


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator.cc@992
PS11, Line 992: UniqueIdPB* qid = new UniqueIdPB;
              :   TUniqueIdToUniqueIdPB(query_id(), qid);
              :   rpc_params.set_allocated_dst_query_id(qid);
This can be simplified to :
TUniqueIdTOUniqueIdPB(query_id(), rpc_params.mutable_dst_query_id());


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator.cc@998
PS11, Line 998:   LOG(INFO) << "target_fragment_idxs.size(): " << target_fragment_idxs.size();
remove this


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator.cc@1008
PS11, Line 1008:   cleanup:
The formatting was correct before, see the output of clang-format


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator.cc@1040
PS11, Line 1040: // Workaround for fact that parameters are const& for Thrift RPCs
update this


http://gerrit.cloudera.org:8080/#/c/13882/11/be/src/runtime/coordinator.cc@1066
PS11, Line 1066:   
formatting


http://gerrit.cloudera.org:8080/#/c/13882/11/common/protobuf/common.proto
File common/protobuf/common.proto:

http://gerrit.cloudera.org:8080/#/c/13882/11/common/protobuf/common.proto@43
PS11, Line 43: this is a union over all possible return types
nit: capitalize and period

Obviously we're somewhat inconsistent about this across the codebase, but we try to be consistent within individual files.



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 11
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Fri, 26 Jul 2019 19:42:58 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Michael Ho (Code Review)" <ge...@cloudera.org>.
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 22:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/13882/22/be/src/util/bloom-filter.h
File be/src/util/bloom-filter.h:

http://gerrit.cloudera.org:8080/#/c/13882/22/be/src/util/bloom-filter.h@46
PS22, Line 46: namespace bloom_filter_test_util {
             : void BfUnion(const impala::BloomFilter& x, const impala::BloomFilter& y,
             :     int64_t directory_size, bool* success, impala::BloomFilterPB* protobuf,
             :     std::string* directory);
             : } // namespace bloom_filter_test_util
             : 
             : namespace either {
             : struct TestData;
             : } // namespace either
> Can these be moved to the test file ?
I guess not. Sorry for missing the friend declaration below.



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 22
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Tue, 24 Sep 2019 17:52:24 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC

Posted by "Fang-Yu Rao (Code Review)" <ge...@cloudera.org>.
Fang-Yu Rao has uploaded a new patch set (#11). ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC
......................................................................

IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC

Port UpdateFilter() and PublishFilter() to KRPC w/o sidecar.

Misc note: fixed the memory leak in data-stream-service.cc.

Testing:
Passed the exhaustive test of private parameterized test.

TODO:
Incorporate sidecar.

Change-Id: I6b394796d250286510e157ae326882bfc01d387a
---
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/rpc/thrift-server-test.cc
M be/src/runtime/backend-client.h
M be/src/runtime/client-cache.cc
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/coordinator-filter-state.h
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/decimal-value.h
M be/src/runtime/decimal-value.inline.h
M be/src/runtime/exec-env.cc
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/timestamp-value.h
M be/src/scheduling/request-pool-service.h
M be/src/service/client-request-state.cc
M be/src/service/client-request-state.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/frontend.h
M be/src/service/impala-internal-service.cc
M be/src/service/impala-internal-service.h
M be/src/service/impala-server.cc
M be/src/service/impala-server.h
M be/src/util/bloom-filter-test.cc
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/min-max-filter-test.cc
M be/src/util/min-max-filter.cc
M be/src/util/min-max-filter.h
M common/protobuf/common.proto
M common/protobuf/data_stream_service.proto
M common/thrift/ImpalaInternalService.thrift
39 files changed, 753 insertions(+), 688 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/82/13882/11
-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 11
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>

[Impala-ASF-CR] IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC

Posted by "Fang-Yu Rao (Code Review)" <ge...@cloudera.org>.
Fang-Yu Rao has uploaded a new patch set (#7). ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC
......................................................................

IMPALA-7984: Port UpdateFilter() and PublishFilter() to KRPC

Port UpdateFilter() and PublishFilter() to KRPC w/o sidecar.

Testing:
1. Have tested this patch set using those EE cases in test_runtime_filters.py
on my local dev box.
2. Have tested this patch set using those BE tests in bloom-filter-test.cc
on my local dev box.

TODO:
Incorporate sidecar.

Change-Id: I6b394796d250286510e157ae326882bfc01d387a
---
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/rpc/thrift-server-test.cc
M be/src/runtime/backend-client.h
M be/src/runtime/client-cache.cc
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/coordinator-filter-state.h
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/decimal-value.h
M be/src/runtime/decimal-value.inline.h
M be/src/runtime/exec-env.cc
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/timestamp-value.h
M be/src/scheduling/request-pool-service.h
M be/src/service/client-request-state.cc
M be/src/service/client-request-state.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/frontend.h
M be/src/service/impala-internal-service.cc
M be/src/service/impala-internal-service.h
M be/src/service/impala-server.cc
M be/src/service/impala-server.h
M be/src/util/bloom-filter-test.cc
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/min-max-filter-test.cc
M be/src/util/min-max-filter.cc
M be/src/util/min-max-filter.h
M common/protobuf/control_service.proto
M common/protobuf/data_stream_service.proto
M common/thrift/ImpalaInternalService.thrift
39 files changed, 774 insertions(+), 690 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/82/13882/7
-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 7
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 22:

Build Successful 

https://jenkins.impala.io/job/gerrit-code-review-checks/4593/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests.


-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 22
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Wed, 18 Sep 2019 22:27:13 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Thomas Tauber-Marshall (Code Review)" <ge...@cloudera.org>.
Thomas Tauber-Marshall has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 17:

(15 comments)

http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.h
File be/src/runtime/coordinator.h:

http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.h@48
PS17, Line 48: using kudu::rpc::RpcContext;
don't use 'using' in header files


http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.cc@986
PS17, Line 986: {
              :       return;
              :     }
You can leave the formatting as-is


http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.cc@1041
PS17, Line 1041: void Coordinator::SetupSidecarForBloomFilter(PublishFilterParamsPB* rpc_params,
Its unfortunate that this logic is duplicated between here and BloomFilter::ToProtobuf, eg. because duplicating code makes it more likely there are bugs (for example I think that you have a bug here in the case where AddOutboundSidecar fails, since you don't disable the bloom filter, even though that bug is already fixed in BloomFilter::ToProtobuf)

There are a couple of possible solutions to this:
1. Move this to be a static function in BloomFilter, eg. BloomFilter::AddSidecar(), and call it from BloomFilter::ToProtobuf()
2. Instead of having Coordinator::FilterState store a BloomFilterPB and 'string directory' just actually reconstruct a BloomFilter object, i.e. around line 1110 below, by calling BloomFilter::Init() with sidecar_slice.data(). Then we would be able to just call the existing BloomFilter::ToProtobuf()
3. some other idea I didn't think of

Option 2 seems cleaner overall to me, since it allows us to better  hide the details of how bloom filters work in the BloomFilter class instead of leaking things like the existence of the 'directory' string into FilterState. 

However, its slightly less efficient (cause we add a transition from BloomFilterPB -> Bloom Filter object and back, with the required setup, eg. in BloomFilter::Init()), though not a ton less efficient since I think it can be done without adding any copies of the directory, and it probably also is more work for you in terms of code changes (eg. BloomFilter::Or would have to be slightly rewritten to take BloomFilter objects instead).

And, option 1 is consistent with the way things already work. So, I'll leave it up to you to decide.


http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.cc@1089
PS17, Line 1089: !params->bloom_filter().always_true() &&
               :           !params->bloom_filter().always_false())
Might be cleaner to check has_directory_sidecar_idx() here, which should be equivalent


http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.cc@1096
PS17, Line 1096:           return;
You need to handle this error, i.e. call Disable()


http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.h
File be/src/runtime/runtime-filter-bank.h:

http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.h@52
PS17, Line 52: class UniqueIdPB;
I don't think this is used anywhere


http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.h@142
PS17, Line 142:   void SendFilterToCoordinatorCompleteCb(const kudu::rpc::RpcController* rpc_controller);
I think this can be private.

Lets also rename it UpdateFilterCompleteCb, to keep it consistent with the name of the rpc.


http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.h@154
PS17, Line 154: num_krpcs_
num_inflight_rpcs_


http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc@133
PS17, Line 133:   DCHECK_NE(state_->query_options().runtime_filter_mode, TRuntimeFilterMode::OFF)
While thinking about the locking protocol in Close(), I noticed that we don't check 'closed_' here like we do at the start of PublishGlobalFilter()

I spent a little time looking at the code, and I think that its correct as is, because RuntimeFilterBank::Close() only gets called from RuntimeState::ReleaseResources(), which is only called from FragmentInstanceState::Close(), which can't be called until after Open() is run on exec_tree_ and UpdateFilterFromLocal() is only called during Open() (via PartitionedHashJoinNode::Open() -> BlockingJoinNode::ProcessBuildInputAndOpenProbe(), which calls on a thread and then blocks until completion for BlockingJoinNode::ProcessBuildInputAsync() -> BlockingJoinNode::SendBuildInputToSink() -> PhjBuilder::FlushFinal())

It might be nice if you could confirm my understanding is correct, and then add a DCHECK(!closed_); and a brief comment (eg. "This is only called from ExecNode::Open()). I won't hold up +2ing this patch for it, though, since its not a regression


http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc@187
PS17, Line 187: {
              :       std::unique_lock<SpinLock> l(num_krpcs_counter_lock_);
              :       num_krpcs_++;
              :     }
This needs to be moved down after the check for get_proxy_status.ok()

Otherwise, we could increment this, fail to get the proxy, never send the rpc so the callback is never called, never decrement it, and then Close() would hang forever


http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc@238
PS17, Line 238:           return;
I don't think that we can just return here, we need to actually handle the error, i.e. do what we've done elsewhere in this function when an error occurs and disable the bloom filter by setting it to BloomFilter::ALWAYS_TRUE_FILTER


http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc@243
PS17, Line 243: sidecar_slice.ToString()
This is doing an unnecessary copy, to Slice::ToString() copies all of the data but then BloomFilter::Init() also copies all of the data. We can just have BloomFilter::Init() take a 'const char*' and call sidecar_slice.data() here


http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc@314
PS17, Line 314: // Use this lock to prevent the BloomFilter's in this Bloom filter bank from
              :   // being closed before their respective asynchronous KRPC calls in
              :   // RuntimeFilterBank::SendFilterToCoordinator() finishes.
This comment is a little weird, maybe rephrase it more like "Wait for all inflight rpcs to complete before closing the filters"


http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc@317
PS17, Line 317: std::unique_lock<SpinLock> l1(num_krpcs_counter_lock_);
              :   while (num_krpcs_ > 0) {
              :     krpcs_done_cv_.wait_for(l1, std::chrono::milliseconds(50));
              :   }
I don't think it technically matters here, but always better not to hold two locks at the same time if its not necessary (to preclude the possibility of deadlock), and in general to hold locks for as short as possible, so I think that we can put this in its own {} block


http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc@324
PS17, Line 324: 
unnecessary extra whitespace



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 17
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Wed, 04 Sep 2019 20:40:41 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Fang-Yu Rao (Code Review)" <ge...@cloudera.org>.
Fang-Yu Rao has uploaded a new patch set (#25). ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................

IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Previously the aggregation and propagation of a runtime filter in Impala is
implemented using Thrift RPC, which suffers from a disadvantage that the number
of connections in a cluster grows with both the number of queries and cluster
size. This patch ports the functions that implement the aggregation and
propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(),
respctively, to KRPC, which requires only one connection per direction between
every pair of hosts, thus reducing the number of connections in a cluster.

In addition, this patch also incorporates KRPC sidecar when the runtime filter
is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the
Bloom filter contents when a Bloom filter is serialized to be transmitted and
hence reduces the serialization overhead. Due to the incorporation of KRPC
sidecar, a SpinLock is also added to prevent a BloomFilter from being
deallocated before its associated KRPC call finishes.

Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are
also modified accordingly because of the changes to the signatures of some
functions in BloomFilter.

Testing:
This patch has passed the exhaustive tests.

Change-Id: I6b394796d250286510e157ae326882bfc01d387a
---
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/runtime/backend-client.h
M be/src/runtime/client-cache.cc
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/coordinator-filter-state.h
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/decimal-value.h
M be/src/runtime/decimal-value.inline.h
M be/src/runtime/exec-env.cc
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-filter.h
M be/src/runtime/timestamp-value.h
M be/src/scheduling/request-pool-service.h
M be/src/service/client-request-state.cc
M be/src/service/client-request-state.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/frontend.h
M be/src/service/impala-internal-service.cc
M be/src/service/impala-internal-service.h
M be/src/service/impala-server.cc
M be/src/service/impala-server.h
M be/src/util/bloom-filter-test.cc
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/min-max-filter-test.cc
M be/src/util/min-max-filter.cc
M be/src/util/min-max-filter.h
M common/protobuf/common.proto
M common/protobuf/data_stream_service.proto
M common/thrift/ImpalaInternalService.thrift
39 files changed, 1,086 insertions(+), 747 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/82/13882/25
-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 25
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 29:

Build Successful 

https://jenkins.impala.io/job/gerrit-code-review-checks/4960/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests.


-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 29
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Wed, 06 Nov 2019 07:13:31 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Michael Ho (Code Review)" <ge...@cloudera.org>.
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 24:

(6 comments)

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1134
PS24, Line 1134: std::string(
               :               reinterpret_cast<const char*>(sidecar_slice.data()), sidecar_slice.size());
Why not std::move(sidecar_slice.ToString()) ?


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@174
PS24, Line 174: UpdateFilterParamsPB* params = obj_pool_.Add(new UpdateFilterParamsPB);
Can you please double check if the parameters need to be preserved beyond this function ? According to (https://github.com/apache/impala/blob/master/be/src/kudu/rpc/proxy.h#L79-L84), it can be freed once the async RPC call is done.


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@175
PS24, Line 175:     UpdateFilterResultPB* res = obj_pool_.Add(new UpdateFilterResultPB);
              :     RpcController* controller = obj_pool_.Add(new RpcController);
I wonder if we can keep these in thread local storage and initialize them once at init time. I suppose we don't invoke this RPC more than once per fragment instance thread, right ? Otherwise, it seems a bit wasteful to keep accumulating these objects in obj_pool_.


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@320
PS24, Line 320:   // Wait for all inflight rpcs to complete before closing the filters.
              :   {
              :     std::unique_lock<SpinLock> l1(num_inflight_rpcs_lock_);
              :     while (num_inflight_rpcs_ > 0) {
              :       krpcs_done_cv_.wait(l1);
              :     }
              :   }
              : 
              :   lock_guard<mutex> l2(runtime_filter_lock_);
              :   closed_ = true;
Do you need to set closed_ to true before waiting for all in-flight RPCs to drain ? Otherwise, you may break out of the critical section above and then a thread may sneak in and try to issues RPC again. Or is it impossible because it's the same fragment instance thread which called RuntimeFilterBank::UpdateFilterFromLocal() ?


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/service/data-stream-service.cc
File be/src/service/data-stream-service.cc:

http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/service/data-stream-service.cc@137
PS24, Line 137:  Substitute("Query State not found for query_id=$0",
              :         PrintId(ProtoToQueryId(req->dst_query_id()))
This can be refactored into a single string object and reuse them here and below in RespondAndReleaseRpc().


http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/service/data-stream-service.cc@141
PS24, Line 141: this->
no need for this



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 24
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Wed, 25 Sep 2019 16:58:23 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by "Michael Ho (Code Review)" <ge...@cloudera.org>.
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 )

Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
......................................................................


Patch Set 22:

(10 comments)

http://gerrit.cloudera.org:8080/#/c/13882/22/be/src/runtime/coordinator-backend-state.cc
File be/src/runtime/coordinator-backend-state.cc:

http://gerrit.cloudera.org:8080/#/c/13882/22/be/src/runtime/coordinator-backend-state.cc@565
PS22, Line 565: LOG(WARNING) 
LOG(ERROR)


http://gerrit.cloudera.org:8080/#/c/13882/22/be/src/runtime/coordinator-backend-state.cc@572
PS22, Line 572: LOG(WARNING)
LOG(ERROR)


http://gerrit.cloudera.org:8080/#/c/13882/22/be/src/runtime/coordinator-backend-state.cc@575
PS22, Line 575: LOG(WARNING)
LOG(ERROR)


http://gerrit.cloudera.org:8080/#/c/13882/22/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/13882/22/be/src/runtime/runtime-filter-bank.cc@120
PS22, Line 120: INFO
ERROR


http://gerrit.cloudera.org:8080/#/c/13882/22/be/src/runtime/runtime-filter-bank.cc@126
PS22, Line 126:     std::unique_lock<SpinLock> l(num_inflight_rpcs_lock_);
DCHECK_GT(num_inflight_rpcs_, 0);


http://gerrit.cloudera.org:8080/#/c/13882/22/be/src/runtime/runtime-filter-bank.cc@127
PS22, Line 127: num_inflight_rpcs_--
nit: pre-increment/decrement:
   --num_inflight_rpcs_;


http://gerrit.cloudera.org:8080/#/c/13882/22/be/src/runtime/runtime-filter-bank.cc@201
PS22, Line 201:       std::unique_lock<SpinLock> l(num_inflight_rpcs_lock_);
Please add a comment on why this is necessary: Increment num_inflight_rpcs_ to make sure that the filter will not be deallocated in Close() until all in-flight RPC completes.


http://gerrit.cloudera.org:8080/#/c/13882/22/be/src/runtime/runtime-filter-bank.cc@202
PS22, Line 202: num_inflight_rpcs_++;
++num_inflight_rpcs_;


http://gerrit.cloudera.org:8080/#/c/13882/22/be/src/runtime/runtime-filter-bank.cc@321
PS22, Line 321: std::chrono::milliseconds(50))
In theory, we don't need a timeout here as we need to wait till the RPC is done. That said, it may not hurt to log a statement after waiting for extended period of time. 50 ms may be a bit iffy if the network is slow or if there is a lot of outbound traffic. May be 60s ?

One potential improvement is that we may as well cancel the RPC after waiting for too long.


http://gerrit.cloudera.org:8080/#/c/13882/22/be/src/util/bloom-filter.h
File be/src/util/bloom-filter.h:

http://gerrit.cloudera.org:8080/#/c/13882/22/be/src/util/bloom-filter.h@46
PS22, Line 46: namespace bloom_filter_test_util {
             : void BfUnion(const impala::BloomFilter& x, const impala::BloomFilter& y,
             :     int64_t directory_size, bool* success, impala::BloomFilterPB* protobuf,
             :     std::string* directory);
             : } // namespace bloom_filter_test_util
             : 
             : namespace either {
             : struct TestData;
             : } // namespace either
Can these be moved to the test file ?



-- 
To view, visit http://gerrit.cloudera.org:8080/13882
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a
Gerrit-Change-Number: 13882
Gerrit-PatchSet: 22
Gerrit-Owner: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Fang-Yu Rao <fa...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tm...@cloudera.com>
Gerrit-Comment-Date: Mon, 23 Sep 2019 20:52:45 +0000
Gerrit-HasComments: Yes