You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@impala.apache.org by "Lars Volker (Code Review)" <ge...@cloudera.org> on 2016/06/24 21:47:35 UTC

[Impala-CR](cdh5-trunk) PREVIEW IMPALA-2550: RPC batching

Lars Volker has uploaded a new change for review.

  http://gerrit.cloudera.org:8080/3390

Change subject: PREVIEW IMPALA-2550: RPC batching
......................................................................

PREVIEW IMPALA-2550: RPC batching

This change replaces the ExecPlanFragment RPC call with a ExecPlanFragments, which
coalesces the startup of multiple fragment instances into a single RPC.

TODO: elaborate

Change-Id: I8e3a9d4a78b59394aaf343df08f6bfda22c3148e
---
M be/src/exec/hdfs-scan-node.cc
M be/src/exec/kudu-scan-node-test.cc
M be/src/exec/kudu-table-sink-test.cc
M be/src/exec/union-node.cc
M be/src/exprs/expr-test.cc
M be/src/runtime/buffered-block-mgr-test.cc
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/plan-fragment-executor.cc
M be/src/runtime/plan-fragment-executor.h
M be/src/runtime/runtime-state.cc
M be/src/runtime/runtime-state.h
M be/src/runtime/test-env.cc
M be/src/service/fragment-exec-state.cc
M be/src/service/fragment-exec-state.h
M be/src/service/fragment-mgr.cc
M be/src/service/fragment-mgr.h
M be/src/service/impala-internal-service.h
M be/src/util/counting-barrier.h
M common/thrift/ImpalaInternalService.thrift
21 files changed, 510 insertions(+), 268 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala refs/changes/90/3390/4
-- 
To view, visit http://gerrit.cloudera.org:8080/3390
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I8e3a9d4a78b59394aaf343df08f6bfda22c3148e
Gerrit-PatchSet: 4
Gerrit-Project: Impala
Gerrit-Branch: cdh5-trunk
Gerrit-Owner: Lars Volker <lv...@cloudera.com>

[Impala-CR](cdh5-trunk) PREVIEW IMPALA-2550: RPC batching

Posted by "Jim Apple (Code Review)" <ge...@cloudera.org>.
Jim Apple has posted comments on this change.

Change subject: PREVIEW IMPALA-2550: RPC batching
......................................................................


Patch Set 5:

Please update to using the new gerrit project, "Impala-ASF".
Instructions are here:

https://cwiki.apache.org/confluence/display/IMPALA/How+to+switch+to+Apache-hosted+git

Pushes to this project will be disabled on October 1.

-- 
To view, visit http://gerrit.cloudera.org:8080/3390
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I8e3a9d4a78b59394aaf343df08f6bfda22c3148e
Gerrit-PatchSet: 5
Gerrit-Project: Impala
Gerrit-Branch: cdh5-trunk
Gerrit-Owner: Lars Volker <lv...@cloudera.com>
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Jim Apple <jb...@cloudera.com>
Gerrit-Reviewer: Lars Volker <lv...@cloudera.com>
Gerrit-Reviewer: Marcel Kornacker <ma...@cloudera.com>
Gerrit-HasComments: No

[Impala-CR](cdh5-trunk) PREVIEW IMPALA-2550: RPC batching

Posted by "Marcel Kornacker (Code Review)" <ge...@cloudera.org>.
Marcel Kornacker has posted comments on this change.

Change subject: PREVIEW IMPALA-2550: RPC batching
......................................................................


Patch Set 5:

(28 comments)

haven't looked at coordinator.{cc,h} yet

http://gerrit.cloudera.org:8080/#/c/3390/5/be/src/runtime/plan-fragment-executor.cc
File be/src/runtime/plan-fragment-executor.cc:

Line 99:   bool fragment_has_reserved_resource = fragment_instance_ctx.__isset.reserved_resource;
need to distinguish between fragments and their instances


Line 126:     if (is_first) {
could do this directly in the Fragment/QueryMgr


Line 207:   RETURN_IF_ERROR(DescriptorTbl::Create(obj_pool(), query_ctx.desc_tbl, &desc_tbl));
let's avoid this duplication across fragment instances


http://gerrit.cloudera.org:8080/#/c/3390/5/be/src/runtime/runtime-state.cc
File be/src/runtime/runtime-state.cc:

Line 72:     query_ctx_(query_ctx),
let's not make copies of that stuff, in particular the thrift version of the descriptor table (which we don't use during execution anyway).

we need to create a way to manage query-duration state.


http://gerrit.cloudera.org:8080/#/c/3390/5/be/src/service/fragment-exec-state.cc
File be/src/service/fragment-exec-state.cc:

Line 121:       if (rpc_status.IsTryAgain()) {
is that from a rebase?


http://gerrit.cloudera.org:8080/#/c/3390/5/be/src/service/fragment-exec-state.h
File be/src/service/fragment-exec-state.h:

Line 74:   // TODO-MT: Make fragment_ctx_ a const shared_ptr to share it between instances.
remove, too speculative


http://gerrit.cloudera.org:8080/#/c/3390/5/be/src/service/fragment-mgr.cc
File be/src/service/fragment-mgr.cc:

Line 51:   std::shared_ptr<const TQueryCtx> query_ctx =
we don't use std:: in .cc


Line 52:       std::make_shared<const TQueryCtx>(exec_params.query_ctx);
are there other pieces of query-wide state that need to stick around until the last fragment instance has finished?

we should wrap those into a QueryExecState (or something else; that name is already in use).


Line 68:       exec_params.fragment_instance_ctxs[fragment_idx];
formatting


Line 95:       lock_guard<SpinLock> l(fragment_exec_state_map_lock_);
do we still need a per-instance map? i don't think for cancellation


Line 104:     // TODO: manage threads via global thread pool?
no, let's not. please remove


Line 105:     const TUniqueId& fragment_id = exec_state->fragment_instance_id();
naming


Line 111:   return_val.all_instance_start_latency_ms =
never assign to thrift struct fields directly, use __set instead


Line 115:   if(status.IsMemLimitExceeded()) {
formatting


Line 124:       << " fragment instances started";
formatting


Line 131:   std::shared_ptr<FragmentExecState> exec_state =
remove (and elsewhere)


http://gerrit.cloudera.org:8080/#/c/3390/5/be/src/service/fragment-mgr.h
File be/src/service/fragment-mgr.h:

Line 29: /// result of ExecPlanFragments() RPCs that arrive via the internal Impala interface.
point out relationship with PlanFragmentExecutor

also, since we now start execution of all fragment instances of a single query together (or at least this class sees all of those), this might better be rebranded QueryFragmentMgr.


Line 33: /// Fragments are Prepare()'d in that thread, and then Exec() is called. At any point a
you need to differentiate carefully now between fragments and their instances. in this case, you're talking about the instance.

please fix that everywhere it applies (= not just in this file).


Line 60:   class FragmentExecState;
is this per fragment or per instance?


Line 78:   FragmentExecStateMap;
indentation


http://gerrit.cloudera.org:8080/#/c/3390/5/common/thrift/ImpalaInternalService.thrift
File common/thrift/ImpalaInternalService.thrift:

Line 366:   // TODO-MT: This is currently unused but will be needed for multi-threading.
why will it be needed?


Line 380:   // required in V1
superfluous comment (the protocol version will always be required, in each  version of the protocol)


Line 386:   // List of contexts of this RPC's fragments.
not really necessary, simply paraphrases the declaration


Line 390:   // total number of fragment instances, the query context, etc.
also not necessary (we shouldn't repeat the content of TPlanFragmentInstanceCtx here anyway)


Line 399:   2: optional i64 all_instance_start_latency_ms
start_latency or startup_latency is fine (if it were a particular fragment we'd want to point that out).

why not make that a float? that way we don't have to change anything if the latency drops below 1ms (which may happen with plan caching)


Line 640:   TExecPlanFragmentsResult ExecPlanFragments(1:TExecPlanFragmentsParams params);
rename to ExecQueryFragments?


Line 649:   TCancelPlanFragmentResult CancelPlanFragment(1:TCancelPlanFragmentParams params);
does it still make sense to call this for each fragment instance?


http://gerrit.cloudera.org:8080/#/c/3390/5/common/thrift/generate_error_codes.py
File common/thrift/generate_error_codes.py:

Line 278:   ("TRY_AGAIN_LATER", 90, ""),
later is superfluous


-- 
To view, visit http://gerrit.cloudera.org:8080/3390
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I8e3a9d4a78b59394aaf343df08f6bfda22c3148e
Gerrit-PatchSet: 5
Gerrit-Project: Impala
Gerrit-Branch: cdh5-trunk
Gerrit-Owner: Lars Volker <lv...@cloudera.com>
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Lars Volker <lv...@cloudera.com>
Gerrit-Reviewer: Marcel Kornacker <ma...@cloudera.com>
Gerrit-Reviewer: Tim Armstrong <ta...@cloudera.com>
Gerrit-HasComments: Yes

[Impala-CR](cdh5-trunk) PREVIEW IMPALA-2550: RPC batching

Posted by "Lars Volker (Code Review)" <ge...@cloudera.org>.
Lars Volker has uploaded a new patch set (#5).

Change subject: PREVIEW IMPALA-2550: RPC batching
......................................................................

PREVIEW IMPALA-2550: RPC batching

This change replaces the ExecPlanFragment RPC call with a ExecPlanFragments, which
coalesces the startup of multiple fragment instances into a single RPC.

TODO: elaborate

Change-Id: I8e3a9d4a78b59394aaf343df08f6bfda22c3148e
---
M be/src/common/status.h
M be/src/exec/hdfs-scan-node.cc
M be/src/exec/kudu-scan-node-test.cc
M be/src/exec/kudu-table-sink-test.cc
M be/src/exec/union-node.cc
M be/src/exprs/expr-test.cc
M be/src/runtime/buffered-block-mgr-test.cc
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/plan-fragment-executor.cc
M be/src/runtime/plan-fragment-executor.h
M be/src/runtime/runtime-state.cc
M be/src/runtime/runtime-state.h
M be/src/runtime/test-env.cc
M be/src/service/fragment-exec-state.cc
M be/src/service/fragment-exec-state.h
M be/src/service/fragment-mgr.cc
M be/src/service/fragment-mgr.h
M be/src/service/impala-internal-service.h
M be/src/util/counting-barrier.h
M common/thrift/ImpalaInternalService.thrift
M common/thrift/generate_error_codes.py
23 files changed, 629 insertions(+), 378 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala refs/changes/90/3390/5
-- 
To view, visit http://gerrit.cloudera.org:8080/3390
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I8e3a9d4a78b59394aaf343df08f6bfda22c3148e
Gerrit-PatchSet: 5
Gerrit-Project: Impala
Gerrit-Branch: cdh5-trunk
Gerrit-Owner: Lars Volker <lv...@cloudera.com>
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Marcel Kornacker <ma...@cloudera.com>
Gerrit-Reviewer: Tim Armstrong <ta...@cloudera.com>

[Impala-CR](cdh5-trunk) PREVIEW IMPALA-2550: RPC batching

Posted by "Marcel Kornacker (Code Review)" <ge...@cloudera.org>.
Marcel Kornacker has posted comments on this change.

Change subject: PREVIEW IMPALA-2550: RPC batching
......................................................................


Patch Set 5:

(9 comments)

http://gerrit.cloudera.org:8080/#/c/3390/5/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

Line 1: // Copyright 2012 Cloudera Inc.
this file doesn't look like it's ready for review. lots of functions are missing comments and the structure has an unfinished feel to it (please correct me if that wasn't the intention)


Line 93: // Helper functions. TODO: move to an own module?
all of these need comments


Line 97: void CollectOutputSinkTableIds(const TPlanFragment& fragment,
why plural? also, why not just return it?


Line 106: void CollectTupleIds(const TPlanFragment& fragment,
this only collects tuple ids from scans, so maybe name it CollectScanTupleIds.


Line 144:     const std::unordered_set<TTupleId>& tuple_ids,
remove std::


Line 205:   fragment_instance_ctx->fragment_instance_id =
never hurts to use __set


Line 421: class Coordinator::RpcBuilder {
why a class instead of a function?


Line 423:   static void BuildSingleFragmentInstance(const TQueryCtx& query_ctx,
needs comment


Line 457:   instance_state_idx(instance_state_idx), fragment_idx(fragment_idx),
formatting


-- 
To view, visit http://gerrit.cloudera.org:8080/3390
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I8e3a9d4a78b59394aaf343df08f6bfda22c3148e
Gerrit-PatchSet: 5
Gerrit-Project: Impala
Gerrit-Branch: cdh5-trunk
Gerrit-Owner: Lars Volker <lv...@cloudera.com>
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Lars Volker <lv...@cloudera.com>
Gerrit-Reviewer: Marcel Kornacker <ma...@cloudera.com>
Gerrit-Reviewer: Tim Armstrong <ta...@cloudera.com>
Gerrit-HasComments: Yes

[Impala-CR](cdh5-trunk) PREVIEW IMPALA-2550: RPC batching

Posted by "Jim Apple (Code Review)" <ge...@cloudera.org>.
Jim Apple has abandoned this change.

Change subject: PREVIEW IMPALA-2550: RPC batching
......................................................................


Abandoned

-- 
To view, visit http://gerrit.cloudera.org:8080/3390
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: abandon
Gerrit-Change-Id: I8e3a9d4a78b59394aaf343df08f6bfda22c3148e
Gerrit-PatchSet: 5
Gerrit-Project: Impala
Gerrit-Branch: cdh5-trunk
Gerrit-Owner: Lars Volker <lv...@cloudera.com>
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Jim Apple <jb...@cloudera.com>
Gerrit-Reviewer: Lars Volker <lv...@cloudera.com>
Gerrit-Reviewer: Marcel Kornacker <ma...@cloudera.com>

[Impala-CR](cdh5-trunk) PREVIEW IMPALA-2550: RPC batching

Posted by "Henry Robinson (Code Review)" <ge...@cloudera.org>.
Henry Robinson has posted comments on this change.

Change subject: PREVIEW IMPALA-2550: RPC batching
......................................................................


Patch Set 4:

(13 comments)

Started to review this, but there are lots of TODOs and missing comments and so on - do you want to ping me again when you think this is ready for a closer look?

http://gerrit.cloudera.org:8080/#/c/3390/4/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

PS4, Line 422: const FragmentExecParams& fragment_exec_params = (*schedule.exec_params())[0];
             :     TQueryCtx query_ctx = query_ctx_;
             :     // TODO is this line needed?
             :     query_ctx.__set_query_id(query_id_);
             :     TPlanFragmentCtx fragment_ctx;
             :     fragment_ctx.__set_fragment(request.fragments[0]);
             :     fragment_ctx.__set_num_fragment_instances(fragment_exec_params.instance_ids.size());
             :     TPlanFragmentInstanceCtx fragment_instance_ctx;
             :     SetFragmentInstanceCtx(schedule, request.fragments[0], fragment_exec_params, 0, 0, 0, coord, &fragment_instance_ctx);
             :     unordered_set<TTupleId> tuple_ids;
             :     CollectTupleIds(request.fragments[0], &tuple_ids);
             :     unordered_set<TTableId> table_ids;
             :     CollectOutputSinkTableIds(request.fragments[0], &table_ids);
             :     PopulateSharedDescriptorTable(tuple_ids, table_ids, &query_ctx);
             :     RETURN_IF_ERROR(executor_->Prepare(query_ctx, fragment_ctx, fragment_instance_ctx));
this block is very hard to read (please also note the long line, here and elsewhere). I think there is some opportunity to factor out fragment creation into some kind of builder or factory class.


PS4, Line 607: RPC's
nit: RPCs


PS4, Line 612: bind<void>(mem_fn(&Coordinator::ExecRemoteFragments), this,
             :           backend_address,
             :           debug_options,
             :           backend_exec_data,
             :           schedule
fwiw, I think lambdas are going to be easier to read here:

  ...->Offer([&] { this->ExecRemoteFragments(backend_address, debug_options, backend_exec_data, schedule); });

Or something.


PS4, Line 1395: batch_rpc_params
just rpc_params, I think.


PS4, Line 1404: // Notification barrier
              :   NotifyBarrierOnExit notifier(exec_complete_barrier_.get(),
              :       backend_exec_data->fragment_instances.size());
please put this back at the top of the method, to avoid bugs creeping in where someone returns without signalling the barrier.


PS4, Line 1424: int instance_state_idx = fragment_instance.instance_state_idx;
              :     int fragment_idx = fragment_instance.fragment_idx;
              :     int fragment_instance_idx = fragment_instance.fragment_instance_idx;
try and keep these declarations as close to their first point of use as possible.


PS4, Line 1456: after
should this comment have said 'before', not 'after'?


PS4, Line 1456: // Guard against concurrent UpdateFragmentExecStatus() that may arrive after RPC returns.
              :     // TODO How to fix this mess? Use std::unique_lock? Unlock the exec_states in
              :     // SetInitialStatus()?
              :     guards.push_back(shared_ptr<lock_guard<mutex>>(new lock_guard<mutex>(*exec_state->lock())));
I agree, we should do something. What would UpdateFragmentExecStatus() access that would be bad concurrently?

How about having the update RPC return the equivalent of EAGAIN, telling the sender to try again when the the call can be processed? We'd need a flag in the exec state, but wouldn't need to take a lock.


PS4, Line 1985: void Coordinator::SetFragmentInstanceCtx(QuerySchedule& schedule,
              :     const TPlanFragment& fragment, const FragmentExecParams& params,
              :     int instance_state_idx, int fragment_idx, int fragment_instance_idx,
              :     const TNetworkAddress& coord,
              :     TPlanFragmentInstanceCtx* fragment_instance_ctx) {
              : 
              :   TNetworkAddress exec_host = params.hosts[fragment_instance_idx];
              :   if (schedule.HasReservation()) {
              :     // The reservation has already have been validated at this point.
              :     TNetworkAddress resource_hostport;
              :     schedule.GetResourceHostport(exec_host, &resource_hostport);
              :     map<TNetworkAddress, llama::TAllocatedResource>::const_iterator it =
              :         schedule.reservation()->allocated_resources.find(resource_hostport);
              :     // Only set reserved resource if we actually have one for this plan
              :     // fragment. Otherwise, don't set it (usually this the coordinator fragment), and it
              :     // won't participate in dynamic RM controls.
              :     if (it != schedule.reservation()->allocated_resources.end()) {
              :       fragment_instance_ctx->__set_reserved_resource(it->second);
              :       fragment_instance_ctx->__set_local_resource_address(resource_hostport);
              :     }
              :   }
              :   FragmentScanRangeAssignment::const_iterator it =
              :       params.scan_range_assignment.find(exec_host);
              :   // Scan ranges may not always be set, so use an empty structure if so.
              :   const PerNodeScanRanges& scan_ranges =
              :       (it != params.scan_range_assignment.end()) ? it->second : PerNodeScanRanges();
              : 
              :   fragment_instance_ctx->__set_request_pool(schedule.request_pool());
              :   fragment_instance_ctx->__set_per_node_scan_ranges(scan_ranges);
              :   fragment_instance_ctx->__set_per_exch_num_senders(params.per_exch_num_senders);
              :   fragment_instance_ctx->__set_destinations(params.destinations);
              :   fragment_instance_ctx->__set_sender_id(params.sender_id_base + fragment_instance_idx);
              :   fragment_instance_ctx->fragment_instance_id = params.instance_ids[fragment_instance_idx];
              :   fragment_instance_ctx->fragment_instance_idx = fragment_instance_idx;
              :   fragment_instance_ctx->instance_state_idx = instance_state_idx;
              : }
does this method access any member variables of Coordinator? if not, consider refactoring so that it's a local helper method.


PS4, Line 2022: void Coordinator::CollectTupleIds(const TPlanFragment& fragment,
similarly here, consider moving out of Coordinator. Hopefully we'll find a way to factor out some of this code into a separate module to keep this file manageable.


http://gerrit.cloudera.org:8080/#/c/3390/4/be/src/runtime/coordinator.h
File be/src/runtime/coordinator.h:

PS4, Line 271: boost
let's prefer std:: where possible.


PS4, Line 271: void CollectTupleIds(const TPlanFragment& fragment, boost::unordered_set<TTupleId>* tuple_ids);
             : 
             :   void CollectOutputSinkTableIds(const TPlanFragment& fragment,
             :       boost::unordered_set<TTableId>* table_ids);
             : 
             :   void PopulateSharedDescriptorTable(
             :     const boost::unordered_set<TTupleId>& tuple_ids,
             :     const boost::unordered_set<TTableId>& sink_table_ids,
             :     TQueryCtx* query_ctx);
as mentioned in .cc, these methods don't seem like they belong to Coordinator.


http://gerrit.cloudera.org:8080/#/c/3390/4/be/src/util/counting-barrier.h
File be/src/util/counting-barrier.h:

PS4, Line 81: NotifyBarrierOnExit& operator=(const NotifyBarrierOnExit&) = delete;
            :   NotifyBarrierOnExit& operator=(NotifyBarrierOnExit&&) = delete;
we use DISALLOW_COPY_AND_ASSIGN usually.


-- 
To view, visit http://gerrit.cloudera.org:8080/3390
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I8e3a9d4a78b59394aaf343df08f6bfda22c3148e
Gerrit-PatchSet: 4
Gerrit-Project: Impala
Gerrit-Branch: cdh5-trunk
Gerrit-Owner: Lars Volker <lv...@cloudera.com>
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Marcel Kornacker <ma...@cloudera.com>
Gerrit-Reviewer: Tim Armstrong <ta...@cloudera.com>
Gerrit-HasComments: Yes

[Impala-CR](cdh5-trunk) PREVIEW IMPALA-2550: RPC batching

Posted by "Lars Volker (Code Review)" <ge...@cloudera.org>.
Lars Volker has posted comments on this change.

Change subject: PREVIEW IMPALA-2550: RPC batching
......................................................................


Patch Set 4:

(13 comments)

Thanks for the review. Addressed your comments in PS5.

http://gerrit.cloudera.org:8080/#/c/3390/4/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

PS4, Line 422: const FragmentExecParams& fragment_exec_params = (*schedule.exec_params())[0];
             :     TQueryCtx query_ctx = query_ctx_;
             :     // TODO is this line needed?
             :     query_ctx.__set_query_id(query_id_);
             :     TPlanFragmentCtx fragment_ctx;
             :     fragment_ctx.__set_fragment(request.fragments[0]);
             :     fragment_ctx.__set_num_fragment_instances(fragment_exec_params.instance_ids.size());
             :     TPlanFragmentInstanceCtx fragment_instance_ctx;
             :     SetFragmentInstanceCtx(schedule, request.fragments[0], fragment_exec_params, 0, 0, 0, coord, &fragment_instance_ctx);
             :     unordered_set<TTupleId> tuple_ids;
             :     CollectTupleIds(request.fragments[0], &tuple_ids);
             :     unordered_set<TTableId> table_ids;
             :     CollectOutputSinkTableIds(request.fragments[0], &table_ids);
             :     PopulateSharedDescriptorTable(tuple_ids, table_ids, &query_ctx);
             :     RETURN_IF_ERROR(executor_->Prepare(query_ctx, fragment_ctx, fragment_instance_ctx));
> this block is very hard to read (please also note the long line, here and e
Factored out creation of the RPC parameters here. I could not see an obvious way to factor out the creation of remote RPC parameters because it is intertwined with the filter and coordinator logic so I left it in place there.


PS4, Line 607: RPC's
> nit: RPCs
Done


PS4, Line 612: bind<void>(mem_fn(&Coordinator::ExecRemoteFragments), this,
             :           backend_address,
             :           debug_options,
             :           backend_exec_data,
             :           schedule
> fwiw, I think lambdas are going to be easier to read here:
Done. We need to capture backend_address by value, which might make the code slightly harder to read.


PS4, Line 1395: batch_rpc_params
> just rpc_params, I think.
Done


PS4, Line 1404: // Notification barrier
              :   NotifyBarrierOnExit notifier(exec_complete_barrier_.get(),
              :       backend_exec_data->fragment_instances.size());
> please put this back at the top of the method, to avoid bugs creeping in wh
Done


PS4, Line 1424: int instance_state_idx = fragment_instance.instance_state_idx;
              :     int fragment_idx = fragment_instance.fragment_idx;
              :     int fragment_instance_idx = fragment_instance.fragment_instance_idx;
> try and keep these declarations as close to their first point of use as pos
Done


PS4, Line 1456: after
> should this comment have said 'before', not 'after'?
Removed.


PS4, Line 1456: // Guard against concurrent UpdateFragmentExecStatus() that may arrive after RPC returns.
              :     // TODO How to fix this mess? Use std::unique_lock? Unlock the exec_states in
              :     // SetInitialStatus()?
              :     guards.push_back(shared_ptr<lock_guard<mutex>>(new lock_guard<mutex>(*exec_state->lock())));
> I agree, we should do something. What would UpdateFragmentExecStatus() acce
Done


PS4, Line 1985: void Coordinator::SetFragmentInstanceCtx(QuerySchedule& schedule,
              :     const TPlanFragment& fragment, const FragmentExecParams& params,
              :     int instance_state_idx, int fragment_idx, int fragment_instance_idx,
              :     const TNetworkAddress& coord,
              :     TPlanFragmentInstanceCtx* fragment_instance_ctx) {
              : 
              :   TNetworkAddress exec_host = params.hosts[fragment_instance_idx];
              :   if (schedule.HasReservation()) {
              :     // The reservation has already have been validated at this point.
              :     TNetworkAddress resource_hostport;
              :     schedule.GetResourceHostport(exec_host, &resource_hostport);
              :     map<TNetworkAddress, llama::TAllocatedResource>::const_iterator it =
              :         schedule.reservation()->allocated_resources.find(resource_hostport);
              :     // Only set reserved resource if we actually have one for this plan
              :     // fragment. Otherwise, don't set it (usually this the coordinator fragment), and it
              :     // won't participate in dynamic RM controls.
              :     if (it != schedule.reservation()->allocated_resources.end()) {
              :       fragment_instance_ctx->__set_reserved_resource(it->second);
              :       fragment_instance_ctx->__set_local_resource_address(resource_hostport);
              :     }
              :   }
              :   FragmentScanRangeAssignment::const_iterator it =
              :       params.scan_range_assignment.find(exec_host);
              :   // Scan ranges may not always be set, so use an empty structure if so.
              :   const PerNodeScanRanges& scan_ranges =
              :       (it != params.scan_range_assignment.end()) ? it->second : PerNodeScanRanges();
              : 
              :   fragment_instance_ctx->__set_request_pool(schedule.request_pool());
              :   fragment_instance_ctx->__set_per_node_scan_ranges(scan_ranges);
              :   fragment_instance_ctx->__set_per_exch_num_senders(params.per_exch_num_senders);
              :   fragment_instance_ctx->__set_destinations(params.destinations);
              :   fragment_instance_ctx->__set_sender_id(params.sender_id_base + fragment_instance_idx);
              :   fragment_instance_ctx->fragment_instance_id = params.instance_ids[fragment_instance_idx];
              :   fragment_instance_ctx->fragment_instance_idx = fragment_instance_idx;
              :   fragment_instance_ctx->instance_state_idx = instance_state_idx;
              : }
> does this method access any member variables of Coordinator? if not, consid
Done


PS4, Line 2022: void Coordinator::CollectTupleIds(const TPlanFragment& fragment,
> similarly here, consider moving out of Coordinator. Hopefully we'll find a 
Done


http://gerrit.cloudera.org:8080/#/c/3390/4/be/src/runtime/coordinator.h
File be/src/runtime/coordinator.h:

PS4, Line 271: boost
> let's prefer std:: where possible.
Done


PS4, Line 271: void CollectTupleIds(const TPlanFragment& fragment, boost::unordered_set<TTupleId>* tuple_ids);
             : 
             :   void CollectOutputSinkTableIds(const TPlanFragment& fragment,
             :       boost::unordered_set<TTableId>* table_ids);
             : 
             :   void PopulateSharedDescriptorTable(
             :     const boost::unordered_set<TTupleId>& tuple_ids,
             :     const boost::unordered_set<TTableId>& sink_table_ids,
             :     TQueryCtx* query_ctx);
> as mentioned in .cc, these methods don't seem like they belong to Coordinat
Done


http://gerrit.cloudera.org:8080/#/c/3390/4/be/src/util/counting-barrier.h
File be/src/util/counting-barrier.h:

PS4, Line 81: NotifyBarrierOnExit& operator=(const NotifyBarrierOnExit&) = delete;
            :   NotifyBarrierOnExit& operator=(NotifyBarrierOnExit&&) = delete;
> we use DISALLOW_COPY_AND_ASSIGN usually.
Done


-- 
To view, visit http://gerrit.cloudera.org:8080/3390
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I8e3a9d4a78b59394aaf343df08f6bfda22c3148e
Gerrit-PatchSet: 4
Gerrit-Project: Impala
Gerrit-Branch: cdh5-trunk
Gerrit-Owner: Lars Volker <lv...@cloudera.com>
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Lars Volker <lv...@cloudera.com>
Gerrit-Reviewer: Marcel Kornacker <ma...@cloudera.com>
Gerrit-Reviewer: Tim Armstrong <ta...@cloudera.com>
Gerrit-HasComments: Yes