You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@impala.apache.org by "Henry Robinson (Code Review)" <ge...@cloudera.org> on 2017/06/07 16:47:50 UTC

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

Henry Robinson has uploaded a new change for review.

  http://gerrit.cloudera.org:8080/7103

Change subject: IMPALA-4856: Port data stream service to KRPC
......................................................................

IMPALA-4856: Port data stream service to KRPC

This patch ports the data-flow parts of ImpalaInternalService to KRPC.

* ImpalaInternalService is split into two services. The first,
  ImpalaInternalService, deals with control messages for plan fragment
  instance execution, cancellation and reporting, and remains
  implemented in Thrift for now. The second, DataStreamService, handles
  large-payload RPCs for transmitting runtime filters and row batches
  between hosts.

* In the DataStreamService, all RPCs use 'native' protobuf. The
  DataStreamService starts on the port previously reserved for the
  StatestoreSubscriberService (which is also a KRPC service), to avoid
  having to configure another port when starting Impala. When the
  ImpalaInternalService is ported to KRPC, all services will run on one
  port.

* To support needing to address two different backend services, a data
  service port has been added to TBackendDescriptor.

* This patch adds support for asynchronous RPCs to the RpcMgr and Rpc
  classes. Previously, Impala used fixed size thread pools + synchronous
  RPCs to achieve some parallelism for 'broadcast' RPCs like filter
  propagation, or a dedicated per-sender+receiver pair thread on the
  sender side in the DataStreamSender case. In this patch, the
  PublishFilter() and TransmitData() RPCs are sent asynchronously using
  KRPC's thread pools.

* The TransmitData() protocol has changed to adapt to asynchronous
  RPCs. The full details are in data-stream-mgr.h.

* As a result, DataStreamSender no longer creates a
  thread-per-connection on the sender side.

* Both tuple transmission and runtime filter publication use sidecars to
  minimise the number of copies and serialization steps required.

* Also include a fix for KUDU-2011 that properly allows sidecars to be
  shared between KRPC and the RPC caller (fixing IMPALA-5093, a
  corruption bug).

* A large portion of this patch is the replacement of TRowBatch with its
  Protobuf equivalent, RowBatchPb. The replacement is a literal port of
  the data structure, and row-batch-test, row-batch-list-test and
  row-batch-serialize-benchmark continue to execute without major logic
  changes.

* Simplify FindRecvr() logic in DataStreamManager. No-longer need to
  handle blocking sender-side, so no need for complex promise-based
  machinery. Instead, all senders with no receiver are added to a
  per-receiver list, which is processed when the receiver arrives. If it
  does not arrive promptly, the DataStreamManager cleans them up after
  FLAGS_datastream_sender_timeout_ms.

* This patch also begins a clean-up of how ImpalaServer instances are
  created (by removing CreateImpalaServer), and clarifying the
  relationship between ExecEnv and ImpalaServer. ImpalaServer now
  follows the standard construct->Init()->Start()->Join() lifecycle that
  we use for other services.

* Ensure that all addresses used for KRPCs are fully resolved, avoiding
  the need to resolve them for each RPC.

TESTING
-------

* New tests added to rpc-mgr-test.

TO DO
-----

* Re-enable throughput and latency measurements per data-stream sender
  when that information is exposed from KRPC (KUDU-1738).

* TLS and Kerberos are still not supported by KRPC in this patch.PART1 DSS

Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
---
M .clang-format
M CMakeLists.txt
M be/CMakeLists.txt
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/benchmarks/row-batch-serialize-benchmark.cc
M be/src/common/init.cc
M be/src/common/status.cc
M be/src/common/status.h
M be/src/exprs/expr-test.cc
M be/src/kudu/rpc/rpc_sidecar.cc
M be/src/kudu/rpc/rpc_sidecar.h
M be/src/rpc/CMakeLists.txt
M be/src/rpc/TAcceptQueueServer.cpp
M be/src/rpc/common.proto
M be/src/rpc/rpc-mgr-test.cc
M be/src/rpc/rpc-mgr.h
M be/src/rpc/rpc-mgr.inline.h
M be/src/rpc/rpc.h
D be/src/runtime/backend-client.h
M be/src/runtime/client-cache-types.h
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/coordinator-filter-state.h
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-mgr.cc
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.cc
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/data-stream-sender.cc
M be/src/runtime/data-stream-sender.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/exec-env.cc
M be/src/runtime/exec-env.h
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/row-batch-serialize-test.cc
M be/src/runtime/row-batch.cc
M be/src/runtime/row-batch.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-state.cc
M be/src/runtime/runtime-state.h
M be/src/scheduling/backend-config-test.cc
M be/src/scheduling/backend-config.cc
M be/src/scheduling/backend-config.h
M be/src/scheduling/query-schedule.h
M be/src/scheduling/scheduler-test-util.cc
M be/src/scheduling/scheduler-test-util.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/CMakeLists.txt
A be/src/service/data_stream_service.proto
M be/src/service/fe-support.cc
M be/src/service/impala-internal-service.cc
M be/src/service/impala-internal-service.h
M be/src/service/impala-server.cc
M be/src/service/impala-server.h
M be/src/service/impalad-main.cc
M be/src/service/session-expiry-test.cc
M be/src/statestore/statestore-subscriber.cc
M be/src/statestore/statestore-test.cc
M be/src/statestore/statestore.cc
M be/src/testutil/fault-injection-util.h
M be/src/testutil/in-process-servers.cc
M be/src/testutil/in-process-servers.h
M be/src/util/blocking-queue.h
M be/src/util/bloom-filter-test.cc
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/hdfs-util-test.cc
M be/src/util/network-util.cc
M be/src/util/network-util.h
M be/src/util/thread-pool.h
M bin/bootstrap_toolchain.py
M bin/impala-config.sh
M common/thrift/CMakeLists.txt
M common/thrift/ImpalaInternalService.thrift
M common/thrift/Results.thrift
M common/thrift/StatestoreService.thrift
M common/thrift/generate_error_codes.py
M tests/custom_cluster/test_breakpad.py
M tests/custom_cluster/test_exchange_delays.py
M tests/custom_cluster/test_rpc_timeout.py
M tests/query_test/test_runtime_filters.py
M tests/stress/test_mini_stress.py
88 files changed, 2,765 insertions(+), 2,003 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/03/7103/1
-- 
To view, visit http://gerrit.cloudera.org:8080/7103
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
Gerrit-PatchSet: 1
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <he...@cloudera.com>

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

Posted by "Henry Robinson (Code Review)" <ge...@cloudera.org>.
Henry Robinson has uploaded a new patch set (#2).

Change subject: IMPALA-4856: Port data stream service to KRPC
......................................................................

IMPALA-4856: Port data stream service to KRPC

This patch ports the data-flow parts of ImpalaInternalService to KRPC.

* ImpalaInternalService is split into two services. The first,
  ImpalaInternalService, deals with control messages for plan fragment
  instance execution, cancellation and reporting, and remains
  implemented in Thrift for now. The second, DataStreamService, handles
  large-payload RPCs for transmitting runtime filters and row batches
  between hosts.

* In the DataStreamService, all RPCs use 'native' protobuf. The
  DataStreamService starts on the port previously reserved for the
  StatestoreSubscriberService (which is also a KRPC service), to avoid
  having to configure another port when starting Impala. When the
  ImpalaInternalService is ported to KRPC, all services will run on one
  port.

* To support needing to address two different backend services, a data
  service port has been added to TBackendDescriptor.

* This patch adds support for asynchronous RPCs to the RpcMgr and Rpc
  classes. Previously, Impala used fixed size thread pools + synchronous
  RPCs to achieve some parallelism for 'broadcast' RPCs like filter
  propagation, or a dedicated per-sender+receiver pair thread on the
  sender side in the DataStreamSender case. In this patch, the
  PublishFilter() and TransmitData() RPCs are sent asynchronously using
  KRPC's thread pools.

* The TransmitData() protocol has changed to adapt to asynchronous
  RPCs. The full details are in data-stream-mgr.h.

* As a result, DataStreamSender no longer creates a
  thread-per-connection on the sender side.

* Both tuple transmission and runtime filter publication use sidecars to
  minimise the number of copies and serialization steps required.

* Also include a fix for KUDU-2011 that properly allows sidecars to be
  shared between KRPC and the RPC caller (fixing IMPALA-5093, a
  corruption bug).

* A large portion of this patch is the replacement of TRowBatch with its
  Protobuf equivalent, RowBatchPb. The replacement is a literal port of
  the data structure, and row-batch-test, row-batch-list-test and
  row-batch-serialize-benchmark continue to execute without major logic
  changes.

* Simplify FindRecvr() logic in DataStreamManager. No-longer need to
  handle blocking sender-side, so no need for complex promise-based
  machinery. Instead, all senders with no receiver are added to a
  per-receiver list, which is processed when the receiver arrives. If it
  does not arrive promptly, the DataStreamManager cleans them up after
  FLAGS_datastream_sender_timeout_ms.

* This patch also begins a clean-up of how ImpalaServer instances are
  created (by removing CreateImpalaServer), and clarifying the
  relationship between ExecEnv and ImpalaServer. ImpalaServer now
  follows the standard construct->Init()->Start()->Join() lifecycle that
  we use for other services.

* Ensure that all addresses used for KRPCs are fully resolved, avoiding
  the need to resolve them for each RPC.

TESTING
-------

* New tests added to rpc-mgr-test.

TO DO
-----

* Re-enable throughput and latency measurements per data-stream sender
  when that information is exposed from KRPC (KUDU-1738).

* TLS and Kerberos are still not supported by KRPC in this patch.PART1 DSS

Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
---
M .clang-format
M CMakeLists.txt
M be/CMakeLists.txt
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/benchmarks/row-batch-serialize-benchmark.cc
M be/src/common/init.cc
M be/src/common/status.cc
M be/src/common/status.h
M be/src/exprs/expr-test.cc
M be/src/kudu/rpc/rpc_sidecar.cc
M be/src/kudu/rpc/rpc_sidecar.h
M be/src/rpc/CMakeLists.txt
M be/src/rpc/TAcceptQueueServer.cpp
M be/src/rpc/common.proto
M be/src/rpc/rpc-mgr-test.cc
M be/src/rpc/rpc-mgr.h
M be/src/rpc/rpc-mgr.inline.h
M be/src/rpc/rpc.h
D be/src/runtime/backend-client.h
M be/src/runtime/client-cache-types.h
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/coordinator-filter-state.h
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-mgr.cc
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.cc
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/data-stream-sender.cc
M be/src/runtime/data-stream-sender.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/exec-env.cc
M be/src/runtime/exec-env.h
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/row-batch-serialize-test.cc
M be/src/runtime/row-batch.cc
M be/src/runtime/row-batch.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-state.cc
M be/src/runtime/runtime-state.h
M be/src/scheduling/backend-config-test.cc
M be/src/scheduling/backend-config.cc
M be/src/scheduling/backend-config.h
M be/src/scheduling/query-schedule.h
M be/src/scheduling/scheduler-test-util.cc
M be/src/scheduling/scheduler-test-util.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/CMakeLists.txt
A be/src/service/data_stream_service.proto
M be/src/service/fe-support.cc
M be/src/service/impala-internal-service.cc
M be/src/service/impala-internal-service.h
M be/src/service/impala-server.cc
M be/src/service/impala-server.h
M be/src/service/impalad-main.cc
M be/src/service/session-expiry-test.cc
M be/src/statestore/statestore-subscriber.cc
M be/src/statestore/statestore-test.cc
M be/src/statestore/statestore.cc
M be/src/testutil/fault-injection-util.h
M be/src/testutil/in-process-servers.cc
M be/src/testutil/in-process-servers.h
M be/src/util/blocking-queue.h
M be/src/util/bloom-filter-test.cc
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/hdfs-util-test.cc
M be/src/util/network-util.cc
M be/src/util/network-util.h
M be/src/util/thread-pool.h
M bin/bootstrap_toolchain.py
M bin/impala-config.sh
M common/thrift/CMakeLists.txt
M common/thrift/ImpalaInternalService.thrift
M common/thrift/Results.thrift
M common/thrift/StatestoreService.thrift
M common/thrift/generate_error_codes.py
M tests/custom_cluster/test_breakpad.py
M tests/custom_cluster/test_exchange_delays.py
M tests/custom_cluster/test_rpc_timeout.py
M tests/query_test/test_runtime_filters.py
M tests/stress/test_mini_stress.py
88 files changed, 2,773 insertions(+), 2,003 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/03/7103/2
-- 
To view, visit http://gerrit.cloudera.org:8080/7103
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: newpatchset
Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
Gerrit-PatchSet: 2
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sa...@cloudera.com>

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

Posted by "Henry Robinson (Code Review)" <ge...@cloudera.org>.
Henry Robinson has uploaded a new patch set (#4).

Change subject: IMPALA-4856: Port data stream service to KRPC
......................................................................

IMPALA-4856: Port data stream service to KRPC

This patch ports the data-flow parts of ImpalaInternalService to KRPC.

* ImpalaInternalService is split into two services. The first,
  ImpalaInternalService, deals with control messages for plan fragment
  instance execution, cancellation and reporting, and remains
  implemented in Thrift for now. The second, DataStreamService, handles
  large-payload RPCs for transmitting runtime filters and row batches
  between hosts.

* In the DataStreamService, all RPCs use 'native' protobuf. The
  DataStreamService starts on the port previously reserved for the
  StatestoreSubscriberService (which is also a KRPC service), to avoid
  having to configure another port when starting Impala. When the
  ImpalaInternalService is ported to KRPC, all services will run on one
  port.

* To support needing to address two different backend services, a data
  service port has been added to TBackendDescriptor.

* This patch adds support for asynchronous RPCs to the RpcMgr and Rpc
  classes. Previously, Impala used fixed size thread pools + synchronous
  RPCs to achieve some parallelism for 'broadcast' RPCs like filter
  propagation, or a dedicated per-sender+receiver pair thread on the
  sender side in the DataStreamSender case. In this patch, the
  PublishFilter() and TransmitData() RPCs are sent asynchronously using
  KRPC's thread pools.

* The TransmitData() protocol has changed to adapt to asynchronous
  RPCs. The full details are in data-stream-mgr.h.

* As a result, DataStreamSender no longer creates a
  thread-per-connection on the sender side.

* Both tuple transmission and runtime filter publication use sidecars to
  minimise the number of copies and serialization steps required.

* Also include a fix for KUDU-2011 that properly allows sidecars to be
  shared between KRPC and the RPC caller (fixing IMPALA-5093, a
  corruption bug).

* A large portion of this patch is the replacement of TRowBatch with its
  Protobuf equivalent, RowBatchPb. The replacement is a literal port of
  the data structure, and row-batch-test, row-batch-list-test and
  row-batch-serialize-benchmark continue to execute without major logic
  changes.

* Simplify FindRecvr() logic in DataStreamManager. No-longer need to
  handle blocking sender-side, so no need for complex promise-based
  machinery. Instead, all senders with no receiver are added to a
  per-receiver list, which is processed when the receiver arrives. If it
  does not arrive promptly, the DataStreamManager cleans them up after
  FLAGS_datastream_sender_timeout_ms.

* This patch also begins a clean-up of how ImpalaServer instances are
  created (by removing CreateImpalaServer), and clarifying the
  relationship between ExecEnv and ImpalaServer. ImpalaServer now
  follows the standard construct->Init()->Start()->Join() lifecycle that
  we use for other services.

* Ensure that all addresses used for KRPCs are fully resolved, avoiding
  the need to resolve them for each RPC.

TESTING
-------

* New tests added to rpc-mgr-test.

TO DO
-----

* Re-enable throughput and latency measurements per data-stream sender
  when that information is exposed from KRPC (KUDU-1738).

* TLS and Kerberos are still not supported by KRPC in this patch.

Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
---
M .clang-format
M CMakeLists.txt
M be/CMakeLists.txt
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/benchmarks/row-batch-serialize-benchmark.cc
M be/src/common/init.cc
M be/src/common/status.cc
M be/src/common/status.h
M be/src/exprs/expr-test.cc
M be/src/kudu/rpc/rpc_sidecar.cc
M be/src/kudu/rpc/rpc_sidecar.h
M be/src/rpc/CMakeLists.txt
M be/src/rpc/TAcceptQueueServer.cpp
M be/src/rpc/common.proto
M be/src/rpc/rpc-mgr-test.cc
M be/src/rpc/rpc-mgr.h
M be/src/rpc/rpc-mgr.inline.h
M be/src/rpc/rpc.h
M be/src/runtime/backend-client.h
M be/src/runtime/client-cache-types.h
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/coordinator-filter-state.h
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-mgr.cc
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.cc
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/data-stream-sender.cc
M be/src/runtime/data-stream-sender.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/exec-env.cc
M be/src/runtime/exec-env.h
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/row-batch-serialize-test.cc
M be/src/runtime/row-batch.cc
M be/src/runtime/row-batch.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/scheduling/backend-config-test.cc
M be/src/scheduling/backend-config.cc
M be/src/scheduling/backend-config.h
M be/src/scheduling/query-schedule.h
M be/src/scheduling/scheduler-test-util.cc
M be/src/scheduling/scheduler-test-util.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/CMakeLists.txt
A be/src/service/data_stream_service.proto
M be/src/service/fe-support.cc
M be/src/service/impala-internal-service.cc
M be/src/service/impala-internal-service.h
M be/src/service/impala-server.cc
M be/src/service/impala-server.h
M be/src/service/impalad-main.cc
M be/src/service/session-expiry-test.cc
M be/src/statestore/statestore-subscriber.cc
M be/src/statestore/statestore-test.cc
M be/src/statestore/statestore.cc
M be/src/testutil/fault-injection-util.h
M be/src/testutil/in-process-servers.cc
M be/src/testutil/in-process-servers.h
M be/src/util/blocking-queue.h
M be/src/util/bloom-filter-test.cc
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/hdfs-util-test.cc
M be/src/util/network-util.cc
M be/src/util/network-util.h
M be/src/util/thread-pool.h
M bin/impala-config.sh
M common/thrift/CMakeLists.txt
M common/thrift/ImpalaInternalService.thrift
M common/thrift/Results.thrift
M common/thrift/StatestoreService.thrift
M common/thrift/generate_error_codes.py
M tests/custom_cluster/test_breakpad.py
M tests/custom_cluster/test_exchange_delays.py
M tests/custom_cluster/test_rpc_timeout.py
M tests/query_test/test_runtime_filters.py
M tests/stress/test_mini_stress.py
85 files changed, 2,771 insertions(+), 1,971 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/03/7103/4
-- 
To view, visit http://gerrit.cloudera.org:8080/7103
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: newpatchset
Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
Gerrit-PatchSet: 4
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sa...@cloudera.com>

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

Posted by "Michael Ho (Code Review)" <ge...@cloudera.org>.
Michael Ho has posted comments on this change.

Change subject: IMPALA-4856: Port data stream service to KRPC
......................................................................


Patch Set 2:

(11 comments)

http://gerrit.cloudera.org:8080/#/c/7103/2/be/src/rpc/rpc.h
File be/src/rpc/rpc.h:

PS2, Line 126: 'cb'
Can you please state the context in which 'cb' is called from (e.g. reactor thread) and also add the precaution caller should take when implementing 'cb' (e.g. no blocking etc) ?


PS2, Line 133: aattempted
typo


PS2, Line 230: //
///

Same below.


PS2, Line 319: Retries
Is there any way to write a be-test to exercise the retry path ?


PS2, Line 322:     auto cb_wrapper = [params = std::move(params), mgr, func, req, resp,
             :         cb = std::move(cb), controller_ptr = controller.release(), num_attempts]()
             :         mutable {
An alternative to this lambda implementation would be to define a separate function and uses boost::bind() to stash the arguments, right ?


PS2, Line 332: cb(Status::OK(), req, resp, controller_ptr);
Why do we pass Status::OK() for non-retryable error or after exceeding the maximum number of retries ?


Line 337:       kudu::MonoDelta retry_interval = kudu::MonoDelta::FromMilliseconds(params->retry_interval_ms);
long line


http://gerrit.cloudera.org:8080/#/c/7103/1/be/src/runtime/data-stream-sender.cc
File be/src/runtime/data-stream-sender.cc:

PS1, Line 265: 10
Mind commenting above what 10 stands for ?


PS1, Line 266: numeric_limits<int32_t>::max()
Why is this not FLAGS_datastream_sender_timeout_ms ?


http://gerrit.cloudera.org:8080/#/c/7103/2/be/src/runtime/data-stream-sender.cc
File be/src/runtime/data-stream-sender.cc:

PS2, Line 203:   auto cb = [self_ptr = weak_ptr<DataStreamSender::Channel>(self_),
             :       instance_id = fragment_instance_id_, proto_batch = batch]
             :       (const Status& status, TransmitDataRequestPb* request,
             :       TransmitDataResponsePb* response, RpcController* controller) {
             : 
             :     // Ensure that request and response get deleted when this callback returns.
             :     auto request_container = unique_ptr<TransmitDataRequestPb>(request);
             :     auto response_container = unique_ptr<TransmitDataResponsePb>(response);
             : 
             :     // Check if this channel still exists.
             :     auto channel = self_ptr.lock();
             :     if (!channel) return;
             :     {
             :       lock_guard<SpinLock> l(channel->lock_);
             :       Status rpc_status = status.ok() ? FromKuduStatus(controller->status()) : status;
             : 
             :       int32_t status_code = response->status().status_code();
             :       channel->recvr_gone_ = status_code == TErrorCode::DATASTREAM_RECVR_ALREADY_GONE;
             : 
             :       if (!rpc_status.ok()) {
             :         channel->last_rpc_status_ = rpc_status;
             :       } else if (!channel->recvr_gone_) {
             :         if (status_code != TErrorCode::OK) {
             :           // Don't bubble up the 'receiver gone' status, because it's not an error.
             :           channel->last_rpc_status_ = Status(response->status());
             :         } else {
             :           int size = proto_batch->GetSize();
             :           channel->num_data_bytes_sent_.Add(size);
             :           VLOG_ROW << "incremented #data_bytes_sent="
             :                    << channel->num_data_bytes_sent_.Load();
             :         }
             :       }
             :       channel->rpc_in_flight_ = false;
             :     }
             :     channel->rpc_done_cv_.notify_one();
             :   };
I am no C++ expert so this question may be stupid: can we not write this as a lambda function ? I am not sure how well gdb can handle lambda functions when compiled with optimization and this callback seems important enough that one may want to inspect its states in a core dump if necessary.


http://gerrit.cloudera.org:8080/#/c/7103/1/be/src/service/impala-internal-service.cc
File be/src/service/impala-internal-service.cc:

PS1, Line 63: DataStreamService
nit: Just wondering why we didn't put DataStreamService in data-stream-service.cc ?


-- 
To view, visit http://gerrit.cloudera.org:8080/7103
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
Gerrit-PatchSet: 2
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sa...@cloudera.com>
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

Posted by "Michael Ho (Code Review)" <ge...@cloudera.org>.
Michael Ho has posted comments on this change.

Change subject: IMPALA-4856: Port data stream service to KRPC
......................................................................


Patch Set 3:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/data-stream-sender.cc
File be/src/runtime/data-stream-sender.cc:

PS3, Line 252: batch->compressed_tuple_data
> The ownership is shared with the batch object. AddSidecar() internally move
I see. I am still getting used to this subtly of passing shared_ptr as argument.


-- 
To view, visit http://gerrit.cloudera.org:8080/7103
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
Gerrit-PatchSet: 3
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sa...@cloudera.com>
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

Posted by "Henry Robinson (Code Review)" <ge...@cloudera.org>.
Henry Robinson has posted comments on this change.

Change subject: IMPALA-4856: Port data stream service to KRPC
......................................................................


Patch Set 1:

(10 comments)

http://gerrit.cloudera.org:8080/#/c/7103/1/be/src/runtime/data-stream-mgr.cc
File be/src/runtime/data-stream-mgr.cc:

Line 61:           bool unused = false;
> Doesn't the contract for FindRecvr() state that we need to hold 'lock_' bef
Done


Line 105:   EarlySendersList waiters;
> Add brief comment:
Done


PS1, Line 123: for (int32_t sender_id: waiters.closing_senders) recvr->RemoveSender(sender_id);
> According to the header comment in data-stream-mgr.h, a sender shouldn't be
Done


PS1, Line 300: early_senders_
> Assume the following case:
The sender fragment instance would fail, and then the coordinator should cancel the receiver. 

I believe there's an outstanding issue where, if the coordinator fails to cancel a fragment instance, the fragment instance will not fail itself. I'm going to file a JIRA for that, but it's unrelated to KRPC.


Line 321:     // Wait for 10s
> Add a brief comment stating that this is to check if the DataStreamMgr is b
Done


http://gerrit.cloudera.org:8080/#/c/7103/1/be/src/runtime/data-stream-mgr.h
File be/src/runtime/data-stream-mgr.h:

PS1, Line 81: will do one of three things
> nit: would be nice to format them as bullet points.
Done


PS1, Line 83: if the buffer is full
> "if the batch queues are full"?
Done


PS1, Line 87: the sender
> "the sender along with its payload" ?
Done


Line 224:   /// has not yet prepared 'payload' is queued until it arrives, or is timed out. If the
> nit: been prepared,
Done


http://gerrit.cloudera.org:8080/#/c/7103/1/be/src/service/impala-server.h
File be/src/service/impala-server.h:

PS1, Line 255: void UpdateFilter
> Leave a TODO stating that this should move to query-state.h/cc after IMPALA
I'm going to leave that for now, since I don't want to make design decisions for IMPALA-3825 in this patch.


-- 
To view, visit http://gerrit.cloudera.org:8080/7103
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
Gerrit-PatchSet: 1
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sa...@cloudera.com>
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

Posted by "Henry Robinson (Code Review)" <ge...@cloudera.org>.
Henry Robinson has posted comments on this change.

Change subject: IMPALA-4856: Port data stream service to KRPC
......................................................................


Patch Set 1:

This patch passes core, exhaustive and ASAN tests. It can execute 32 concurrent streams of TPCDS-Q17 @ scale factor 30000 on a 138-node cluster with Kerberos enabled. (I don't believe the previous implementation could do this effectively because of the number of Thrift connections required). 

Some perf results from a 20-node cluster:

+------------+----------+-----------------------+--------+-------------+------------+-----------+----------------+-------------+-------+
| Workload   | Query    | File Format           | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%) | Base StdDev(%) | Num Clients | Iters |
+------------+----------+-----------------------+--------+-------------+------------+-----------+----------------+-------------+-------+
| TPCH(_300) | TPCH-Q3  | parquet / none / none | 32.55  | 28.18       |   +15.51%  |   4.71%   |   1.17%        | 1           | 3     |
| TPCH(_300) | TPCH-Q13 | parquet / none / none | 24.43  | 22.21       |   +9.99%   |   0.61%   |   0.70%        | 1           | 3     |
| TPCH(_300) | TPCH-Q8  | parquet / none / none | 7.53   | 7.05        |   +6.69%   |   1.70%   |   2.09%        | 1           | 3     |
| TPCH(_300) | TPCH-Q22 | parquet / none / none | 6.35   | 6.04        |   +5.19%   |   0.37%   |   0.76%        | 1           | 3     |
| TPCH(_300) | TPCH-Q14 | parquet / none / none | 4.28   | 4.10        |   +4.36%   |   0.03%   |   0.73%        | 1           | 3     |
| TPCH(_300) | TPCH-Q15 | parquet / none / none | 3.53   | 3.41        |   +3.69%   |   0.61%   |   1.42%        | 1           | 3     |
| TPCH(_300) | TPCH-Q16 | parquet / none / none | 6.09   | 5.87        |   +3.63%   |   0.15%   |   1.78%        | 1           | 3     |
| TPCH(_300) | TPCH-Q11 | parquet / none / none | 1.73   | 1.70        |   +2.22%   |   0.10%   |   0.95%        | 1           | 3     |
| TPCH(_300) | TPCH-Q21 | parquet / none / none | 105.84 | 103.71      |   +2.06%   |   0.57%   |   0.44%        | 1           | 3     |
| TPCH(_300) | TPCH-Q9  | parquet / none / none | 30.76  | 30.46       |   +1.00%   |   2.57%   |   1.22%        | 1           | 3     |
| TPCH(_300) | TPCH-Q1  | parquet / none / none | 22.14  | 21.94       |   +0.91%   |   0.81%   |   0.86%        | 1           | 3     |
| TPCH(_300) | TPCH-Q4  | parquet / none / none | 5.09   | 5.05        |   +0.79%   |   0.48%   |   2.54%        | 1           | 3     |
| TPCH(_300) | TPCH-Q18 | parquet / none / none | 31.76  | 32.54       |   -2.39%   |   0.44%   |   0.03%        | 1           | 3     |
| TPCH(_300) | TPCH-Q2  | parquet / none / none | 1.98   | 2.04        |   -2.74%   |   7.17%   |   7.41%        | 1           | 3     |
| TPCH(_300) | TPCH-Q5  | parquet / none / none | 47.62  | 48.98       |   -2.79%   |   0.51%   |   0.16%        | 1           | 3     |
| TPCH(_300) | TPCH-Q20 | parquet / none / none | 3.18   | 3.27        |   -2.89%   |   1.34%   |   1.98%        | 1           | 3     |
| TPCH(_300) | TPCH-Q6  | parquet / none / none | 1.32   | 1.37        |   -3.72%   |   0.03%   |   4.00%        | 1           | 3     |
| TPCH(_300) | TPCH-Q10 | parquet / none / none | 9.00   | 9.48        |   -5.06%   |   0.16%   |   0.69%        | 1           | 3     |
| TPCH(_300) | TPCH-Q17 | parquet / none / none | 5.16   | 5.75        |   -10.18%  |   6.44%   |   2.63%        | 1           | 3     |
| TPCH(_300) | TPCH-Q12 | parquet / none / none | 3.01   | 3.39        |   -11.38%  |   2.43%   |   0.06%        | 1           | 3     |
| TPCH(_300) | TPCH-Q19 | parquet / none / none | 25.20  | 28.82       | I -12.57%  |   0.01%   |   0.75%        | 1           | 3     |
| TPCH(_300) | TPCH-Q7  | parquet / none / none | 45.32  | 61.16       | I -25.91%  |   0.55%   |   2.22%        | 1           | 3     |
+------------+----------+-----------------------+--------+-------------+------------+-----------+----------------+-------------+-------+

Primitives (note the significant regression in many_independent_fragments, that needs further attention)

+---------------------+--------------------------------------------------------+-----------------------+--------+-------------+------------+-----------+----------------+-------------+-------+
| Workload            | Query                                                  | File Format           | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%) | Base StdDev(%) | Num Clients | Iters |
+---------------------+--------------------------------------------------------+-----------------------+--------+-------------+------------+-----------+----------------+-------------+-------+
| TARGETED-PERF(_300) | primitive_many_independent_fragments                   | parquet / none / none | 377.69 | 189.40      | R +99.42%  |   0.32%   |   0.22%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_filter_bigint_in_list                        | parquet / none / none | 0.95   | 0.89        |   +6.16%   |   0.01%   |   0.44%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_many_fragments                               | parquet / none / none | 60.87  | 57.83       |   +5.25%   |   1.00%   |   0.54%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_groupby_bigint_highndv                       | parquet / none / none | 25.53  | 24.69       |   +3.40%   |   1.30%   |   0.92%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_top-n_all                                    | parquet / none / none | 40.68  | 39.36       |   +3.35%   |   0.75%   |   1.49%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_exchange_broadcast                           | parquet / none / none | 84.74  | 82.23       |   +3.05%   |   5.18%   |   0.13%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_intrinsic_appx_median                        | parquet / none / none | 35.04  | 34.21       |   +2.42%   |   1.01%   |   0.96%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_filter_string_like                           | parquet / none / none | 5.48   | 5.38        |   +1.74%   |   0.94%   |   0.93%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_groupby_bigint_pk                            | parquet / none / none | 95.74  | 94.35       |   +1.47%   |   0.82%   |   2.74%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_decimal_arithmetic                           | parquet / none / none | 100.51 | 99.40       |   +1.12%   |   0.72%   |   1.62%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_conjunct_ordering_1                          | parquet / none / none | 4.73   | 4.68        |   +1.04%   |   0.04%   |   1.12%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_groupby_bigint_lowndv                        | parquet / none / none | 3.34   | 3.32        |   +0.48%   |   2.30%   |   0.00%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_orderby_bigint_expression                    | parquet / none / none | 18.49  | 18.42       |   +0.41%   |   2.91%   |   4.33%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_filter_decimal_selective                     | parquet / none / none | 0.60   | 0.60        |   +0.22%   |   0.27%   |   0.35%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_shuffle_join_one_to_many_string_with_groupby | parquet / none / none | 240.84 | 241.03      |   -0.08%   |   0.46%   |   0.44%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_conjunct_ordering_3                          | parquet / none / none | 1.02   | 1.02        |   -0.35%   |   0.07%   |   0.34%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_orderby_all                                  | parquet / none / none | 54.61  | 54.84       |   -0.42%   |   1.48%   |   1.04%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_empty_build_join_1                           | parquet / none / none | 6.61   | 6.65        |   -0.72%   |   0.02%   |   0.78%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_conjunct_ordering_4                          | parquet / none / none | 0.86   | 0.88        |   -1.76%   |   0.28%   |   3.07%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_groupby_decimal_lowndv.test                  | parquet / none / none | 3.39   | 3.47        |   -2.24%   |   2.29%   |   1.41%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_intrinsic_to_date                            | parquet / none / none | 79.05  | 81.70       |   -3.24%   |   0.42%   |   0.36%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_filter_string_selective                      | parquet / none / none | 0.53   | 0.55        |   -4.82%   |   4.22%   |   0.39%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_broadcast_join_2                             | parquet / none / none | 4.07   | 4.36        |   -6.68%   |   0.66%   |   0.45%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_broadcast_join_3                             | parquet / none / none | 49.97  | 53.78       |   -7.09%   |   0.45%   |   0.45%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_topn_bigint                                  | parquet / none / none | 5.13   | 5.52        |   -7.12%   |   0.68%   |   0.06%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_filter_bigint_selective                      | parquet / none / none | 0.37   | 0.40        |   -7.23%   |   7.01%   |   0.28%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_broadcast_join_1                             | parquet / none / none | 0.96   | 1.04        |   -7.25%   |   0.08%   |   7.60%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_filter_string_non_selective                  | parquet / none / none | 0.90   | 0.98        |   -7.67%   |   5.27%   |   2.36%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_filter_decimal_non_selective                 | parquet / none / none | 0.85   | 0.93        |   -8.51%   |   0.06%   |   2.42%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_orderby_bigint                               | parquet / none / none | 14.56  | 16.13       | I -9.72%   |   0.44%   |   0.12%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_conjunct_ordering_2                          | parquet / none / none | 24.32  | 27.75       | I -12.36%  |   0.28%   |   0.63%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_groupby_decimal_highndv                      | parquet / none / none | 21.17  | 24.49       |   -13.58%  |   0.35%   |   1.96%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_exchange_shuffle                             | parquet / none / none | 69.10  | 80.03       |   -13.66%  |   0.10%   |   1.22%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_shuffle_join_union_all_with_groupby          | parquet / none / none | 56.55  | 67.01       | I -15.61%  |   1.06%   |   0.23%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_conjunct_ordering_5                          | parquet / none / none | 28.41  | 35.20       |   -19.28%  |   5.12%   |   6.49%        | 1           | 3     |
| TARGETED-PERF(_300) | primitive_filter_bigint_non_selective                  | parquet / none / none | 1.08   | 1.65        | I -34.87%  |   7.17%   |   3.12%        | 1           | 3     |
+---------------------+--------------------------------------------------------+-----------------------+--------+-------------+------------+-----------+----------------+-------------+-------+

-- 
To view, visit http://gerrit.cloudera.org:8080/7103
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
Gerrit-PatchSet: 1
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

Posted by "Sailesh Mukil (Code Review)" <ge...@cloudera.org>.
Sailesh Mukil has uploaded a new patch set (#6).

Change subject: IMPALA-4856: Port data stream service to KRPC
......................................................................

IMPALA-4856: Port data stream service to KRPC

This patch ports the data-flow parts of ImpalaInternalService to KRPC.

* ImpalaInternalService is split into two services. The first,
  ImpalaInternalService, deals with control messages for plan fragment
  instance execution, cancellation and reporting, and remains
  implemented in Thrift for now. The second, DataStreamService, handles
  large-payload RPCs for transmitting runtime filters and row batches
  between hosts.

* In the DataStreamService, all RPCs use 'native' protobuf. The
  DataStreamService starts on the port previously reserved for the
  StatestoreSubscriberService (which is also a KRPC service), to avoid
  having to configure another port when starting Impala. When the
  ImpalaInternalService is ported to KRPC, all services will run on one
  port.

* To support needing to address two different backend services, a data
  service port has been added to TBackendDescriptor.

* This patch adds support for asynchronous RPCs to the RpcMgr and Rpc
  classes. Previously, Impala used fixed size thread pools + synchronous
  RPCs to achieve some parallelism for 'broadcast' RPCs like filter
  propagation, or a dedicated per-sender+receiver pair thread on the
  sender side in the DataStreamSender case. In this patch, the
  PublishFilter() and TransmitData() RPCs are sent asynchronously using
  KRPC's thread pools.

* The TransmitData() protocol has changed to adapt to asynchronous
  RPCs. The full details are in data-stream-mgr.h.

* As a result, DataStreamSender no longer creates a
  thread-per-connection on the sender side.

* Both tuple transmission and runtime filter publication use sidecars to
  minimise the number of copies and serialization steps required.

* Also include a fix for KUDU-2011 that properly allows sidecars to be
  shared between KRPC and the RPC caller (fixing IMPALA-5093, a
  corruption bug).

* A large portion of this patch is the replacement of TRowBatch with its
  Protobuf equivalent, RowBatchPb. The replacement is a literal port of
  the data structure, and row-batch-test, row-batch-list-test and
  row-batch-serialize-benchmark continue to execute without major logic
  changes.

* Simplify FindRecvr() logic in DataStreamManager. No-longer need to
  handle blocking sender-side, so no need for complex promise-based
  machinery. Instead, all senders with no receiver are added to a
  per-receiver list, which is processed when the receiver arrives. If it
  does not arrive promptly, the DataStreamManager cleans them up after
  FLAGS_datastream_sender_timeout_ms.

* This patch also begins a clean-up of how ImpalaServer instances are
  created (by removing CreateImpalaServer), and clarifying the
  relationship between ExecEnv and ImpalaServer. ImpalaServer now
  follows the standard construct->Init()->Start()->Join() lifecycle that
  we use for other services.

* Ensure that all addresses used for KRPCs are fully resolved, avoiding
  the need to resolve them for each RPC.

TESTING
-------

* New tests added to rpc-mgr-test.

TO DO
-----

* Re-enable throughput and latency measurements per data-stream sender
  when that information is exposed from KRPC (KUDU-1738).

* TLS and Kerberos are still not supported by KRPC in this patch.

Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
---
M .clang-format
M CMakeLists.txt
M be/CMakeLists.txt
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/benchmarks/row-batch-serialize-benchmark.cc
M be/src/common/init.cc
M be/src/common/status.cc
M be/src/common/status.h
M be/src/exprs/expr-test.cc
M be/src/kudu/rpc/rpc_sidecar.cc
M be/src/kudu/rpc/rpc_sidecar.h
M be/src/rpc/CMakeLists.txt
M be/src/rpc/TAcceptQueueServer.cpp
M be/src/rpc/common.proto
M be/src/rpc/rpc-mgr-test.cc
M be/src/rpc/rpc-mgr.h
M be/src/rpc/rpc-mgr.inline.h
M be/src/rpc/rpc.h
M be/src/runtime/backend-client.h
M be/src/runtime/client-cache-types.h
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/coordinator-filter-state.h
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-mgr.cc
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.cc
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/data-stream-sender.cc
M be/src/runtime/data-stream-sender.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/exec-env.cc
M be/src/runtime/exec-env.h
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/row-batch-serialize-test.cc
M be/src/runtime/row-batch.cc
M be/src/runtime/row-batch.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/scheduling/backend-config-test.cc
M be/src/scheduling/backend-config.cc
M be/src/scheduling/backend-config.h
M be/src/scheduling/query-schedule.h
M be/src/scheduling/scheduler-test-util.cc
M be/src/scheduling/scheduler-test-util.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/CMakeLists.txt
A be/src/service/data_stream_service.proto
M be/src/service/impala-internal-service.cc
M be/src/service/impala-internal-service.h
M be/src/service/impala-server.cc
M be/src/service/impala-server.h
M be/src/service/impalad-main.cc
M be/src/service/session-expiry-test.cc
M be/src/statestore/statestore-subscriber.cc
M be/src/statestore/statestore-test.cc
M be/src/statestore/statestore.cc
M be/src/testutil/fault-injection-util.h
M be/src/testutil/in-process-servers.cc
M be/src/testutil/in-process-servers.h
M be/src/util/bloom-filter-test.cc
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/hdfs-util-test.cc
M be/src/util/network-util.cc
M be/src/util/network-util.h
M be/src/util/thread-pool.h
M bin/impala-config.sh
M common/thrift/CMakeLists.txt
M common/thrift/ImpalaInternalService.thrift
M common/thrift/Results.thrift
M common/thrift/StatestoreService.thrift
M common/thrift/generate_error_codes.py
M tests/custom_cluster/test_breakpad.py
M tests/custom_cluster/test_exchange_delays.py
M tests/custom_cluster/test_rpc_timeout.py
M tests/query_test/test_runtime_filters.py
M tests/stress/test_mini_stress.py
83 files changed, 2,793 insertions(+), 1,980 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/03/7103/6
-- 
To view, visit http://gerrit.cloudera.org:8080/7103
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: newpatchset
Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
Gerrit-PatchSet: 6
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sa...@cloudera.com>

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

Posted by "Henry Robinson (Code Review)" <ge...@cloudera.org>.
Henry Robinson has posted comments on this change.

Change subject: IMPALA-4856: Port data stream service to KRPC
......................................................................


Patch Set 5:

(22 comments)

PS4 is a rebase. PS5 includes the review responses (so diff 4->5 if you want to see what changed).

http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/rpc/rpc.h
File be/src/rpc/rpc.h:

PS3, Line 106: Ownership is
             :   // shared by the caller, and the RPC subsystem
> Doesn't std::move transfer the ownership so the caller no longer shares the
The shared_ptr is copied in. It's the copy that is then moved into the sidecar list.


PS3, Line 143: are owned by the caller
> the ownership is temporarily transferred to the RPC call when this function
I don't think so - the RPC call has pointers, but doesn't have ownership in the sense that it has no responsibility for managing a reference count or freeing the memory.


PS3, Line 147: 
> Having the names 'func', 'cb' and 'cb_wrapper' all close by each other make
Done


PS3, Line 153: 
> Does this move mean that the params_ member is invalid after this call? If 
Done


PS3, Line 327: 
> Maybe name this 'completion_cb'.
Done


http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/coordinator-backend-state.cc
File be/src/runtime/coordinator-backend-state.cc:

PS3, Line 389: equest->mutable_bloom_filter()->set_log_heap_space(0);
             :     request->mutable_bloom_filter()->set_directory_sidecar_idx(-1);
             :   }
> Why wouldn't a move capture ensure the same thing?
proto_filter is a const shared_ptr&. You can't move from it. Instead, we could have the argument be shared_ptr<ProtoBloomFilter>, and move from it here; they're basically equivalent, it's just a question of where you make the copy.


http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

PS3, Line 1207:       VLOG_QUERY << "Not enough memory to allocate filter: "
              :                  << PrettyPrinter::Print(heap_space, TUnit::BYTES)
              :                  << " (query: " << coord->query_id() << ")";
              :       // Disable, as one missing update means a correct filter cannot be 
> I would add this to the commit message. This means we would take double the
I don't think so - because params.directory is a sidecar I don't think it's been copied since it arrived on the socket. In the Thrift case, the bytestream had to be deserialized into a TBloomFilter. That's what's happening here - the equivalent 'deserialization' step.

This path should only get taken the first time a filter arrives, and it does briefly keep two filters around (the sidecar should get destroyed as soon as the RPC is responded to).


http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/data-stream-recvr.cc
File be/src/runtime/data-stream-recvr.cc:

PS3, Line 280: blocked_senders_.front()
> Is this a right way to dispose a unique_ptr?
Good point - release() is clearer, and get() may have been a benign bug.


http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/data-stream-sender.cc
File be/src/runtime/data-stream-sender.cc:

PS3, Line 58: 
> Not used.
Done


PS3, Line 60: 
> Not used.
Done


PS3, Line 133: scoped_ptr<RowBatch> batch_;
> No one really calls GetNumDataBytesSent() (except from our BE test). So, I'
We're gaining correctness - so worth doing (otherwise if someone decides to use it in the future, they might run into problems).


PS3, Line 148: 
> A reader of this code might not immediately understand why this class needs
I expanded the comment here.


PS3, Line 170: 
> Why is this set in Init()? Wouldn't it ideally be set it in the constructor
Moved to c'tor.


PS3, Line 175: proto_batch_idx_
> Just want to make sure that this will increase the shared_ptr refcount? It 
Yep - this was a mistake. Removed auto to make it more explicit.


PS3, Line 203: co
> Prefer a more descriptive name "rpc_completion_cb" or something similar.
Done


PS3, Line 214: ck_guard
> channel == nullptr
Done


PS3, Line 252: batch->tuple_data, &idx);
> Is this transferring the ownership to the RPC subsystem ? AddSideCar() inte
The ownership is shared with the batch object. AddSidecar() internally moves from the argument, which is a copy (i.e. its own reference).


PS3, Line 266: .release(), rpc_complete_callback);
> This is a subtle change in behavior from previous Impala version. In partic
Any reasonably conservative timeout runs the risk of false negatives if a sender is blocked.

I agree with your analysis about this being a change in behaviour. In practice, though, here's what I hope will happen: if one write to a node is slow enough to previously trigger the timeout, I would expect the statestore RPCs to also go slow (and they will time out); the node will be marked as offline and the query will be cancelled. 

If there is a situation where this RPC only is slow in writing (but all other RPCs to the server are ok), then I agree this query will not timeout where it would previously. The query is still cancellable in that case. My reading of the current implementation is that the query is not cancellable if it becomes blocked in send(), which is one of the reasons the send timeout exists.

This is something else that rpc cancellation will help with.


http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/row-batch.cc
File be/src/runtime/row-batch.cc:

PS3, Line 117:     rei
> DCHECK_EQ
Done


Line 216:       output_batch->compressed_tuple_data->resize(compressed_size);
> Do you think it would be good to add a comment why we don't free the 'tuple
Done


http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/row-batch.h
File be/src/runtime/row-batch.h:

PS3, Line 73: != THdfsCompression::NON
> != THdfsCompression::NONE
Done


PS3, Line 441: FlushMode flush_ = FlushMode::NO_FLUSH_RESOURCES
> Why not initialize this and the other args below with the default values in
I believe it's clearer to initialize them at the point of declaration. It also saves duplication between > 1 c'tor.


-- 
To view, visit http://gerrit.cloudera.org:8080/7103
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
Gerrit-PatchSet: 5
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sa...@cloudera.com>
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

Posted by "Henry Robinson (Code Review)" <ge...@cloudera.org>.
Henry Robinson has uploaded a new patch set (#3).

Change subject: IMPALA-4856: Port data stream service to KRPC
......................................................................

IMPALA-4856: Port data stream service to KRPC

This patch ports the data-flow parts of ImpalaInternalService to KRPC.

* ImpalaInternalService is split into two services. The first,
  ImpalaInternalService, deals with control messages for plan fragment
  instance execution, cancellation and reporting, and remains
  implemented in Thrift for now. The second, DataStreamService, handles
  large-payload RPCs for transmitting runtime filters and row batches
  between hosts.

* In the DataStreamService, all RPCs use 'native' protobuf. The
  DataStreamService starts on the port previously reserved for the
  StatestoreSubscriberService (which is also a KRPC service), to avoid
  having to configure another port when starting Impala. When the
  ImpalaInternalService is ported to KRPC, all services will run on one
  port.

* To support needing to address two different backend services, a data
  service port has been added to TBackendDescriptor.

* This patch adds support for asynchronous RPCs to the RpcMgr and Rpc
  classes. Previously, Impala used fixed size thread pools + synchronous
  RPCs to achieve some parallelism for 'broadcast' RPCs like filter
  propagation, or a dedicated per-sender+receiver pair thread on the
  sender side in the DataStreamSender case. In this patch, the
  PublishFilter() and TransmitData() RPCs are sent asynchronously using
  KRPC's thread pools.

* The TransmitData() protocol has changed to adapt to asynchronous
  RPCs. The full details are in data-stream-mgr.h.

* As a result, DataStreamSender no longer creates a
  thread-per-connection on the sender side.

* Both tuple transmission and runtime filter publication use sidecars to
  minimise the number of copies and serialization steps required.

* Also include a fix for KUDU-2011 that properly allows sidecars to be
  shared between KRPC and the RPC caller (fixing IMPALA-5093, a
  corruption bug).

* A large portion of this patch is the replacement of TRowBatch with its
  Protobuf equivalent, RowBatchPb. The replacement is a literal port of
  the data structure, and row-batch-test, row-batch-list-test and
  row-batch-serialize-benchmark continue to execute without major logic
  changes.

* Simplify FindRecvr() logic in DataStreamManager. No-longer need to
  handle blocking sender-side, so no need for complex promise-based
  machinery. Instead, all senders with no receiver are added to a
  per-receiver list, which is processed when the receiver arrives. If it
  does not arrive promptly, the DataStreamManager cleans them up after
  FLAGS_datastream_sender_timeout_ms.

* This patch also begins a clean-up of how ImpalaServer instances are
  created (by removing CreateImpalaServer), and clarifying the
  relationship between ExecEnv and ImpalaServer. ImpalaServer now
  follows the standard construct->Init()->Start()->Join() lifecycle that
  we use for other services.

* Ensure that all addresses used for KRPCs are fully resolved, avoiding
  the need to resolve them for each RPC.

TESTING
-------

* New tests added to rpc-mgr-test.

TO DO
-----

* Re-enable throughput and latency measurements per data-stream sender
  when that information is exposed from KRPC (KUDU-1738).

* TLS and Kerberos are still not supported by KRPC in this patch.

Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
---
M .clang-format
M CMakeLists.txt
M be/CMakeLists.txt
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/benchmarks/row-batch-serialize-benchmark.cc
M be/src/common/init.cc
M be/src/common/status.cc
M be/src/common/status.h
M be/src/exprs/expr-test.cc
M be/src/kudu/rpc/rpc_sidecar.cc
M be/src/kudu/rpc/rpc_sidecar.h
M be/src/rpc/CMakeLists.txt
M be/src/rpc/TAcceptQueueServer.cpp
M be/src/rpc/common.proto
M be/src/rpc/rpc-mgr-test.cc
M be/src/rpc/rpc-mgr.h
M be/src/rpc/rpc-mgr.inline.h
M be/src/rpc/rpc.h
D be/src/runtime/backend-client.h
M be/src/runtime/client-cache-types.h
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/coordinator-filter-state.h
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-mgr.cc
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.cc
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/data-stream-sender.cc
M be/src/runtime/data-stream-sender.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/exec-env.cc
M be/src/runtime/exec-env.h
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/row-batch-serialize-test.cc
M be/src/runtime/row-batch.cc
M be/src/runtime/row-batch.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-state.cc
M be/src/runtime/runtime-state.h
M be/src/scheduling/backend-config-test.cc
M be/src/scheduling/backend-config.cc
M be/src/scheduling/backend-config.h
M be/src/scheduling/query-schedule.h
M be/src/scheduling/scheduler-test-util.cc
M be/src/scheduling/scheduler-test-util.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/CMakeLists.txt
A be/src/service/data_stream_service.proto
M be/src/service/fe-support.cc
M be/src/service/impala-internal-service.cc
M be/src/service/impala-internal-service.h
M be/src/service/impala-server.cc
M be/src/service/impala-server.h
M be/src/service/impalad-main.cc
M be/src/service/session-expiry-test.cc
M be/src/statestore/statestore-subscriber.cc
M be/src/statestore/statestore-test.cc
M be/src/statestore/statestore.cc
M be/src/testutil/fault-injection-util.h
M be/src/testutil/in-process-servers.cc
M be/src/testutil/in-process-servers.h
M be/src/util/blocking-queue.h
M be/src/util/bloom-filter-test.cc
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/hdfs-util-test.cc
M be/src/util/network-util.cc
M be/src/util/network-util.h
M be/src/util/thread-pool.h
M bin/bootstrap_toolchain.py
M bin/impala-config.sh
M common/thrift/CMakeLists.txt
M common/thrift/ImpalaInternalService.thrift
M common/thrift/Results.thrift
M common/thrift/StatestoreService.thrift
M common/thrift/generate_error_codes.py
M tests/custom_cluster/test_breakpad.py
M tests/custom_cluster/test_exchange_delays.py
M tests/custom_cluster/test_rpc_timeout.py
M tests/query_test/test_runtime_filters.py
M tests/stress/test_mini_stress.py
88 files changed, 2,773 insertions(+), 2,005 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/03/7103/3
-- 
To view, visit http://gerrit.cloudera.org:8080/7103
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: newpatchset
Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
Gerrit-PatchSet: 3
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sa...@cloudera.com>

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

Posted by "Michael Ho (Code Review)" <ge...@cloudera.org>.
Michael Ho has posted comments on this change.

Change subject: IMPALA-4856: Port data stream service to KRPC
......................................................................


Patch Set 2:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/7103/2/be/src/rpc/rpc.h
File be/src/rpc/rpc.h:

PS2, Line 332: cb(Status::OK(), req, resp, controller_ptr);
> Why do we pass Status::OK() for non-retryable error or after exceeding the 
Re-reading the comments above, status seems to indicate whether status was successfully attempted so it may be okay. The assumption is that cb will check for remote error from controller_ptr.


-- 
To view, visit http://gerrit.cloudera.org:8080/7103
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
Gerrit-PatchSet: 2
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sa...@cloudera.com>
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

Posted by "Sailesh Mukil (Code Review)" <ge...@cloudera.org>.
Sailesh Mukil has posted comments on this change.

Change subject: IMPALA-4856: Port data stream service to KRPC
......................................................................


Patch Set 3:

(17 comments)

http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/rpc/rpc.h
File be/src/rpc/rpc.h:

PS3, Line 147: func
Having the names 'func', 'cb' and 'cb_wrapper' all close by each other makes some of this slightly more complicated to read.

I would opt for renaming 'func' to 'rpc_method' or something, so that it's crystal clear that it may not be a callback itself.


PS3, Line 153: std::move(params_)
Does this move mean that the params_ member is invalid after this call? If so, it would be good to add a comment where it's declared.


PS3, Line 327: cb
Maybe name this 'completion_cb'.


http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/coordinator-backend-state.cc
File be/src/runtime/coordinator-backend-state.cc:

PS3, Line 389: Copying proto_filter here ensures that its lifetime will last at least until this
             :   // callback completes.
             :   auto cb = [proto_filter]
Why wouldn't a move capture ensure the same thing?


http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

PS3, Line 1207:       // Do an explicit copy of the directory: 'params' may have come from an RPC so we
              :       // can't assume ownership of its directory.
              :       bloom_filter_ =
              :           make_unique<ProtoBloomFilter>(params.header, params.directory);
I would add this to the commit message. This means we would take double the memory cost every time we merge filters right?


http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/data-stream-recvr.cc
File be/src/runtime/data-stream-recvr.cc:

PS3, Line 280: blocked_senders_.front()
Is this a right way to dispose a unique_ptr?


http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/data-stream-sender.cc
File be/src/runtime/data-stream-sender.cc:

PS3, Line 58: DECLARE_int32(datastream_sender_timeout_ms);
Not used.


PS3, Line 60: DECLARE_int32(state_store_subscriber_port);
Not used.


PS3, Line 133: AtomicInt64 num_data_bytes_sent_
No one really calls GetNumDataBytesSent() (except from our BE test). So, I'm not sure we're gaining anything by making this atomic.
Not a big deal, but it might be cheaper to leave it as an int64. Your call though.


PS3, Line 148: self_
A reader of this code might not immediately understand why this class needs to always have a reference to itself. It would be good to explicitly mention this member name in the header where the explanation is given.


PS3, Line 170: self_ = shared_from_this();
Why is this set in Init()? Wouldn't it ideally be set it in the constructor?


PS3, Line 175: auto proto_batch
Just want to make sure that this will increase the shared_ptr refcount? It should because it will make an underlying copy of the pointer, but I just want to make sure.


PS3, Line 203: cb
Prefer a more descriptive name "rpc_completion_cb" or something similar.


Line 336:   batch_.reset();
DCHECK(self_.get())


http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/row-batch.cc
File be/src/runtime/row-batch.cc:

Line 216:       output_batch->header.set_compression_type(THdfsCompression::LZ4);
Do you think it would be good to add a comment why we don't free the 'tuple_data' buffer here? Presumably so we can reuse the memory when the RowBatch is recycled?


http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/row-batch.h
File be/src/runtime/row-batch.h:

PS3, Line 73: == THdfsCompression::LZ4
!= THdfsCompression::NONE

Functionally same, more readable.


PS3, Line 441: FlushMode flush_ = FlushMode::NO_FLUSH_RESOURCES
Why not initialize this and the other args below with the default values in the constructor member initialization list?


-- 
To view, visit http://gerrit.cloudera.org:8080/7103
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
Gerrit-PatchSet: 3
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sa...@cloudera.com>
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

Posted by "Henry Robinson (Code Review)" <ge...@cloudera.org>.
Henry Robinson has abandoned this change.

Change subject: IMPALA-4856: Port data stream service to KRPC
......................................................................


Abandoned

-- 
To view, visit http://gerrit.cloudera.org:8080/7103
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: abandon
Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
Gerrit-PatchSet: 6
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Mostafa Mokhtar <mm...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sa...@cloudera.com>

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

Posted by "Henry Robinson (Code Review)" <ge...@cloudera.org>.
Henry Robinson has posted comments on this change.

Change subject: IMPALA-4856: Port data stream service to KRPC
......................................................................


Patch Set 2:

(8 comments)

http://gerrit.cloudera.org:8080/#/c/7103/2/be/src/rpc/rpc.h
File be/src/rpc/rpc.h:

PS2, Line 126: 'cb'
> Can you please state the context in which 'cb' is called from (e.g. reactor
Done


PS2, Line 133: aattempted
> typo
Done


PS2, Line 230: //
> ///
Done


PS2, Line 319: Retries
> Is there any way to write a be-test to exercise the retry path ?
Yep - see rpc-mgr-test.cc, RetryAsyncTest. That injects ERROR_SERVER_TOO_BUSY into an RPC response which triggers the retry logic.


PS2, Line 322:     auto cb_wrapper = [params = std::move(params), mgr, func, req, resp,
             :         cb = std::move(cb), controller_ptr = controller.release(), num_attempts]()
             :         mutable {
> An alternative to this lambda implementation would be to define a separate 
It would, yeah. My preference is for using a lambda here partly because it's very clear about how the arguments are copied, and how ownership is managed (I find the copying behaviour of bind() a bit inscrutable).


PS2, Line 332: cb(Status::OK(), req, resp, controller_ptr);
> Re-reading the comments above, status seems to indicate whether status was 
Right - I was in two minds about using different statuses, or merging together the one from the RpcController and the one that we must provide somehow. I think this is the simplest way to pass both statuses, but let me know if you have an idea! I added a comment for now.


Line 337:       kudu::MonoDelta retry_interval = kudu::MonoDelta::FromMilliseconds(params->retry_interval_ms);
> long line
Done


http://gerrit.cloudera.org:8080/#/c/7103/2/be/src/runtime/data-stream-sender.cc
File be/src/runtime/data-stream-sender.cc:

PS2, Line 203:   auto cb = [self_ptr = weak_ptr<DataStreamSender::Channel>(self_),
             :       instance_id = fragment_instance_id_, proto_batch = batch]
             :       (const Status& status, TransmitDataRequestPb* request,
             :       TransmitDataResponsePb* response, RpcController* controller) {
             : 
             :     // Ensure that request and response get deleted when this callback returns.
             :     auto request_container = unique_ptr<TransmitDataRequestPb>(request);
             :     auto response_container = unique_ptr<TransmitDataResponsePb>(response);
             : 
             :     // Check if this channel still exists.
             :     auto channel = self_ptr.lock();
             :     if (!channel) return;
             :     {
             :       lock_guard<SpinLock> l(channel->lock_);
             :       Status rpc_status = status.ok() ? FromKuduStatus(controller->status()) : status;
             : 
             :       int32_t status_code = response->status().status_code();
             :       channel->recvr_gone_ = status_code == TErrorCode::DATASTREAM_RECVR_ALREADY_GONE;
             : 
             :       if (!rpc_status.ok()) {
             :         channel->last_rpc_status_ = rpc_status;
             :       } else if (!channel->recvr_gone_) {
             :         if (status_code != TErrorCode::OK) {
             :           // Don't bubble up the 'receiver gone' status, because it's not an error.
             :           channel->last_rpc_status_ = Status(response->status());
             :         } else {
             :           int size = proto_batch->GetSize();
             :           channel->num_data_bytes_sent_.Add(size);
             :           VLOG_ROW << "incremented #data_bytes_sent="
             :                    << channel->num_data_bytes_sent_.Load();
             :         }
             :       }
             :       channel->rpc_in_flight_ = false;
             :     }
             :     channel->rpc_done_cv_.notify_one();
             :   };
> I am no C++ expert so this question may be stupid: can we not write this as
We could write this as a method, and use bind(), or we could create a struct with one method (this one) that captures the context upon construction. The latter is what a lambda compiles down to, and I prefer the syntax sugar a lambda gives you. The former uses bind(), which I am not a great fan of. 

In my experience, gdb handles this just fine, and the stack is IMHO cleaner than using bind() (I've broken in this method lots of times!).


-- 
To view, visit http://gerrit.cloudera.org:8080/7103
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
Gerrit-PatchSet: 2
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sa...@cloudera.com>
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

Posted by "Michael Ho (Code Review)" <ge...@cloudera.org>.
Michael Ho has posted comments on this change.

Change subject: IMPALA-4856: Port data stream service to KRPC
......................................................................


Patch Set 3:

(6 comments)

Some more comments. Still going through the patch.

http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/rpc/rpc.h
File be/src/rpc/rpc.h:

PS3, Line 106: Ownership is
             :   // shared by the caller, and the RPC subsystem
Doesn't std::move transfer the ownership so the caller no longer shares the ownership, right ?


PS3, Line 143: are owned by the caller
the ownership is temporarily transferred to the RPC call when this function is invoked, right ?


http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/data-stream-sender.cc
File be/src/runtime/data-stream-sender.cc:

PS3, Line 214: !channel
channel == nullptr


PS3, Line 252: batch->compressed_tuple_data
Is this transferring the ownership to the RPC subsystem ? AddSideCar() internally uses std::move(). This seems subtle enough to warrant a comment.


PS3, Line 266: MonoDelta::FromMilliseconds(numeric_limits<int32_t>::max())
This is a subtle change in behavior from previous Impala version. In particular, FLAGS_backend_client_rpc_timeout_ms marks that the timeout for a socket if a thrift thread was stuck writing to the socket.

Given KRPC socket is asynchronous, the DSS may get blocked for quite a while until the query gets cancelled. Should we impose some reasonably conservative timeout here ?


http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/row-batch.cc
File be/src/runtime/row-batch.cc:

PS3, Line 117: DCHECK(
DCHECK_EQ


-- 
To view, visit http://gerrit.cloudera.org:8080/7103
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
Gerrit-PatchSet: 3
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sa...@cloudera.com>
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

Posted by "Henry Robinson (Code Review)" <ge...@cloudera.org>.
Henry Robinson has uploaded a new patch set (#5).

Change subject: IMPALA-4856: Port data stream service to KRPC
......................................................................

IMPALA-4856: Port data stream service to KRPC

This patch ports the data-flow parts of ImpalaInternalService to KRPC.

* ImpalaInternalService is split into two services. The first,
  ImpalaInternalService, deals with control messages for plan fragment
  instance execution, cancellation and reporting, and remains
  implemented in Thrift for now. The second, DataStreamService, handles
  large-payload RPCs for transmitting runtime filters and row batches
  between hosts.

* In the DataStreamService, all RPCs use 'native' protobuf. The
  DataStreamService starts on the port previously reserved for the
  StatestoreSubscriberService (which is also a KRPC service), to avoid
  having to configure another port when starting Impala. When the
  ImpalaInternalService is ported to KRPC, all services will run on one
  port.

* To support needing to address two different backend services, a data
  service port has been added to TBackendDescriptor.

* This patch adds support for asynchronous RPCs to the RpcMgr and Rpc
  classes. Previously, Impala used fixed size thread pools + synchronous
  RPCs to achieve some parallelism for 'broadcast' RPCs like filter
  propagation, or a dedicated per-sender+receiver pair thread on the
  sender side in the DataStreamSender case. In this patch, the
  PublishFilter() and TransmitData() RPCs are sent asynchronously using
  KRPC's thread pools.

* The TransmitData() protocol has changed to adapt to asynchronous
  RPCs. The full details are in data-stream-mgr.h.

* As a result, DataStreamSender no longer creates a
  thread-per-connection on the sender side.

* Both tuple transmission and runtime filter publication use sidecars to
  minimise the number of copies and serialization steps required.

* Also include a fix for KUDU-2011 that properly allows sidecars to be
  shared between KRPC and the RPC caller (fixing IMPALA-5093, a
  corruption bug).

* A large portion of this patch is the replacement of TRowBatch with its
  Protobuf equivalent, RowBatchPb. The replacement is a literal port of
  the data structure, and row-batch-test, row-batch-list-test and
  row-batch-serialize-benchmark continue to execute without major logic
  changes.

* Simplify FindRecvr() logic in DataStreamManager. No-longer need to
  handle blocking sender-side, so no need for complex promise-based
  machinery. Instead, all senders with no receiver are added to a
  per-receiver list, which is processed when the receiver arrives. If it
  does not arrive promptly, the DataStreamManager cleans them up after
  FLAGS_datastream_sender_timeout_ms.

* This patch also begins a clean-up of how ImpalaServer instances are
  created (by removing CreateImpalaServer), and clarifying the
  relationship between ExecEnv and ImpalaServer. ImpalaServer now
  follows the standard construct->Init()->Start()->Join() lifecycle that
  we use for other services.

* Ensure that all addresses used for KRPCs are fully resolved, avoiding
  the need to resolve them for each RPC.

TESTING
-------

* New tests added to rpc-mgr-test.

TO DO
-----

* Re-enable throughput and latency measurements per data-stream sender
  when that information is exposed from KRPC (KUDU-1738).

* TLS and Kerberos are still not supported by KRPC in this patch.

Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
---
M .clang-format
M CMakeLists.txt
M be/CMakeLists.txt
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/benchmarks/row-batch-serialize-benchmark.cc
M be/src/common/init.cc
M be/src/common/status.cc
M be/src/common/status.h
M be/src/exprs/expr-test.cc
M be/src/kudu/rpc/rpc_sidecar.cc
M be/src/kudu/rpc/rpc_sidecar.h
M be/src/rpc/CMakeLists.txt
M be/src/rpc/TAcceptQueueServer.cpp
M be/src/rpc/common.proto
M be/src/rpc/rpc-mgr-test.cc
M be/src/rpc/rpc-mgr.h
M be/src/rpc/rpc-mgr.inline.h
M be/src/rpc/rpc.h
M be/src/runtime/backend-client.h
M be/src/runtime/client-cache-types.h
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/coordinator-filter-state.h
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-mgr.cc
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.cc
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/data-stream-sender.cc
M be/src/runtime/data-stream-sender.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/exec-env.cc
M be/src/runtime/exec-env.h
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/row-batch-serialize-test.cc
M be/src/runtime/row-batch.cc
M be/src/runtime/row-batch.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/scheduling/backend-config-test.cc
M be/src/scheduling/backend-config.cc
M be/src/scheduling/backend-config.h
M be/src/scheduling/query-schedule.h
M be/src/scheduling/scheduler-test-util.cc
M be/src/scheduling/scheduler-test-util.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/CMakeLists.txt
A be/src/service/data_stream_service.proto
M be/src/service/impala-internal-service.cc
M be/src/service/impala-internal-service.h
M be/src/service/impala-server.cc
M be/src/service/impala-server.h
M be/src/service/impalad-main.cc
M be/src/service/session-expiry-test.cc
M be/src/statestore/statestore-subscriber.cc
M be/src/statestore/statestore-test.cc
M be/src/statestore/statestore.cc
M be/src/testutil/fault-injection-util.h
M be/src/testutil/in-process-servers.cc
M be/src/testutil/in-process-servers.h
M be/src/util/blocking-queue.h
M be/src/util/bloom-filter-test.cc
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/hdfs-util-test.cc
M be/src/util/network-util.cc
M be/src/util/network-util.h
M be/src/util/thread-pool.h
M bin/impala-config.sh
M common/thrift/CMakeLists.txt
M common/thrift/ImpalaInternalService.thrift
M common/thrift/Results.thrift
M common/thrift/StatestoreService.thrift
M common/thrift/generate_error_codes.py
M tests/custom_cluster/test_breakpad.py
M tests/custom_cluster/test_exchange_delays.py
M tests/custom_cluster/test_rpc_timeout.py
M tests/query_test/test_runtime_filters.py
M tests/stress/test_mini_stress.py
84 files changed, 2,796 insertions(+), 1,983 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/03/7103/5
-- 
To view, visit http://gerrit.cloudera.org:8080/7103
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: newpatchset
Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
Gerrit-PatchSet: 5
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Michael Ho <kw...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sa...@cloudera.com>

[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC

Posted by "Sailesh Mukil (Code Review)" <ge...@cloudera.org>.
Sailesh Mukil has posted comments on this change.

Change subject: IMPALA-4856: Port data stream service to KRPC
......................................................................


Patch Set 1:

(12 comments)

> This patch passes core, exhaustive and ASAN tests. It can execute
 > 32 concurrent streams of TPCDS-Q17 @ scale factor 30000 on a
 > 138-node cluster with Kerberos enabled. (I don't believe the
 > previous implementation could do this effectively because of the
 > number of Thrift connections required).
 > 
 > Some perf results from a 20-node cluster:
 > 
 > +------------+----------+-----------------------+--------+-------------+------------+-----------+----------------+-------------+-------+
 > | Workload   | Query    | File Format           | Avg(s) | Base
 > Avg(s) | Delta(Avg) | StdDev(%) | Base StdDev(%) | Num Clients |
 > Iters |
 > +------------+----------+-----------------------+--------+-------------+------------+-----------+----------------+-------------+-------+
 > | TPCH(_300) | TPCH-Q3  | parquet / none / none | 32.55  | 28.18   
 >    |   +15.51%  |   4.71%   |   1.17%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q13 | parquet / none / none | 24.43  | 22.21   
 >    |   +9.99%   |   0.61%   |   0.70%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q8  | parquet / none / none | 7.53   | 7.05    
 >    |   +6.69%   |   1.70%   |   2.09%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q22 | parquet / none / none | 6.35   | 6.04    
 >    |   +5.19%   |   0.37%   |   0.76%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q14 | parquet / none / none | 4.28   | 4.10    
 >    |   +4.36%   |   0.03%   |   0.73%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q15 | parquet / none / none | 3.53   | 3.41    
 >    |   +3.69%   |   0.61%   |   1.42%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q16 | parquet / none / none | 6.09   | 5.87    
 >    |   +3.63%   |   0.15%   |   1.78%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q11 | parquet / none / none | 1.73   | 1.70    
 >    |   +2.22%   |   0.10%   |   0.95%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q21 | parquet / none / none | 105.84 | 103.71  
 >    |   +2.06%   |   0.57%   |   0.44%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q9  | parquet / none / none | 30.76  | 30.46   
 >    |   +1.00%   |   2.57%   |   1.22%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q1  | parquet / none / none | 22.14  | 21.94   
 >    |   +0.91%   |   0.81%   |   0.86%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q4  | parquet / none / none | 5.09   | 5.05    
 >    |   +0.79%   |   0.48%   |   2.54%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q18 | parquet / none / none | 31.76  | 32.54   
 >    |   -2.39%   |   0.44%   |   0.03%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q2  | parquet / none / none | 1.98   | 2.04    
 >    |   -2.74%   |   7.17%   |   7.41%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q5  | parquet / none / none | 47.62  | 48.98   
 >    |   -2.79%   |   0.51%   |   0.16%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q20 | parquet / none / none | 3.18   | 3.27    
 >    |   -2.89%   |   1.34%   |   1.98%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q6  | parquet / none / none | 1.32   | 1.37    
 >    |   -3.72%   |   0.03%   |   4.00%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q10 | parquet / none / none | 9.00   | 9.48    
 >    |   -5.06%   |   0.16%   |   0.69%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q17 | parquet / none / none | 5.16   | 5.75    
 >    |   -10.18%  |   6.44%   |   2.63%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q12 | parquet / none / none | 3.01   | 3.39    
 >    |   -11.38%  |   2.43%   |   0.06%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q19 | parquet / none / none | 25.20  | 28.82   
 >    | I -12.57%  |   0.01%   |   0.75%        | 1           | 3    
 > |
 > | TPCH(_300) | TPCH-Q7  | parquet / none / none | 45.32  | 61.16   
 >    | I -25.91%  |   0.55%   |   2.22%        | 1           | 3    
 > |
 > +------------+----------+-----------------------+--------+-------------+------------+-----------+----------------+-------------+-------+
 > 
 > Primitives (note the significant regression in many_independent_fragments,
 > that needs further attention)
 > 
 > +---------------------+--------------------------------------------------------+-----------------------+--------+-------------+------------+-----------+----------------+-------------+-------+
 > | Workload            | Query                                      
 >            | File Format           | Avg(s) | Base Avg(s) |
 > Delta(Avg) | StdDev(%) | Base StdDev(%) | Num Clients | Iters |
 > +---------------------+--------------------------------------------------------+-----------------------+--------+-------------+------------+-----------+----------------+-------------+-------+
 > | TARGETED-PERF(_300) | primitive_many_independent_fragments       
 >            | parquet / none / none | 377.69 | 189.40      | R
 > +99.42%  |   0.32%   |   0.22%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_filter_bigint_in_list            
 >            | parquet / none / none | 0.95   | 0.89        |  
 > +6.16%   |   0.01%   |   0.44%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_many_fragments                   
 >            | parquet / none / none | 60.87  | 57.83       |  
 > +5.25%   |   1.00%   |   0.54%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_groupby_bigint_highndv           
 >            | parquet / none / none | 25.53  | 24.69       |  
 > +3.40%   |   1.30%   |   0.92%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_top-n_all                        
 >            | parquet / none / none | 40.68  | 39.36       |  
 > +3.35%   |   0.75%   |   1.49%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_exchange_broadcast               
 >            | parquet / none / none | 84.74  | 82.23       |  
 > +3.05%   |   5.18%   |   0.13%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_intrinsic_appx_median            
 >            | parquet / none / none | 35.04  | 34.21       |  
 > +2.42%   |   1.01%   |   0.96%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_filter_string_like               
 >            | parquet / none / none | 5.48   | 5.38        |  
 > +1.74%   |   0.94%   |   0.93%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_groupby_bigint_pk                
 >            | parquet / none / none | 95.74  | 94.35       |  
 > +1.47%   |   0.82%   |   2.74%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_decimal_arithmetic               
 >            | parquet / none / none | 100.51 | 99.40       |  
 > +1.12%   |   0.72%   |   1.62%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_conjunct_ordering_1              
 >            | parquet / none / none | 4.73   | 4.68        |  
 > +1.04%   |   0.04%   |   1.12%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_groupby_bigint_lowndv            
 >            | parquet / none / none | 3.34   | 3.32        |  
 > +0.48%   |   2.30%   |   0.00%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_orderby_bigint_expression        
 >            | parquet / none / none | 18.49  | 18.42       |  
 > +0.41%   |   2.91%   |   4.33%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_filter_decimal_selective         
 >            | parquet / none / none | 0.60   | 0.60        |  
 > +0.22%   |   0.27%   |   0.35%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_shuffle_join_one_to_many_string_with_groupby
 > | parquet / none / none | 240.84 | 241.03      |   -0.08%   |  
 > 0.46%   |   0.44%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_conjunct_ordering_3              
 >            | parquet / none / none | 1.02   | 1.02        |  
 > -0.35%   |   0.07%   |   0.34%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_orderby_all                      
 >            | parquet / none / none | 54.61  | 54.84       |  
 > -0.42%   |   1.48%   |   1.04%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_empty_build_join_1               
 >            | parquet / none / none | 6.61   | 6.65        |  
 > -0.72%   |   0.02%   |   0.78%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_conjunct_ordering_4              
 >            | parquet / none / none | 0.86   | 0.88        |  
 > -1.76%   |   0.28%   |   3.07%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_groupby_decimal_lowndv.test      
 >            | parquet / none / none | 3.39   | 3.47        |  
 > -2.24%   |   2.29%   |   1.41%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_intrinsic_to_date                
 >            | parquet / none / none | 79.05  | 81.70       |  
 > -3.24%   |   0.42%   |   0.36%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_filter_string_selective          
 >            | parquet / none / none | 0.53   | 0.55        |  
 > -4.82%   |   4.22%   |   0.39%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_broadcast_join_2                 
 >            | parquet / none / none | 4.07   | 4.36        |  
 > -6.68%   |   0.66%   |   0.45%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_broadcast_join_3                 
 >            | parquet / none / none | 49.97  | 53.78       |  
 > -7.09%   |   0.45%   |   0.45%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_topn_bigint                      
 >            | parquet / none / none | 5.13   | 5.52        |  
 > -7.12%   |   0.68%   |   0.06%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_filter_bigint_selective          
 >            | parquet / none / none | 0.37   | 0.40        |  
 > -7.23%   |   7.01%   |   0.28%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_broadcast_join_1                 
 >            | parquet / none / none | 0.96   | 1.04        |  
 > -7.25%   |   0.08%   |   7.60%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_filter_string_non_selective      
 >            | parquet / none / none | 0.90   | 0.98        |  
 > -7.67%   |   5.27%   |   2.36%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_filter_decimal_non_selective     
 >            | parquet / none / none | 0.85   | 0.93        |  
 > -8.51%   |   0.06%   |   2.42%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_orderby_bigint                   
 >            | parquet / none / none | 14.56  | 16.13       | I
 > -9.72%   |   0.44%   |   0.12%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_conjunct_ordering_2              
 >            | parquet / none / none | 24.32  | 27.75       | I
 > -12.36%  |   0.28%   |   0.63%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_groupby_decimal_highndv          
 >            | parquet / none / none | 21.17  | 24.49       |  
 > -13.58%  |   0.35%   |   1.96%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_exchange_shuffle                 
 >            | parquet / none / none | 69.10  | 80.03       |  
 > -13.66%  |   0.10%   |   1.22%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_shuffle_join_union_all_with_groupby
 >          | parquet / none / none | 56.55  | 67.01       | I -15.61%
 >  |   1.06%   |   0.23%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_conjunct_ordering_5              
 >            | parquet / none / none | 28.41  | 35.20       |  
 > -19.28%  |   5.12%   |   6.49%        | 1           | 3     |
 > | TARGETED-PERF(_300) | primitive_filter_bigint_non_selective      
 >            | parquet / none / none | 1.08   | 1.65        | I
 > -34.87%  |   7.17%   |   3.12%        | 1           | 3     |
 > +---------------------+--------------------------------------------------------+-----------------------+--------+-------------+------------+-----------+----------------+-------------+-------+

Did a first pass.

Just clarifying, but you mentioned that Kerberos isn't supported in the commit message, but your benchmark was with kerberos? Did you run it with a security patch to enable kerberos?

http://gerrit.cloudera.org:8080/#/c/7103/1/be/src/runtime/data-stream-mgr.cc
File be/src/runtime/data-stream-mgr.cc:

Line 61:           bool unused = false;
Doesn't the contract for FindRecvr() state that we need to hold 'lock_' before we call it?


Line 105:   EarlySendersList waiters;
Add brief comment:
"Process payloads of early senders for this receiver now that it's created" or something similar.


PS1, Line 123: for (int32_t sender_id: waiters.closing_senders) recvr->RemoveSender(sender_id);
According to the header comment in data-stream-mgr.h, a sender shouldn't be in 'closing_senders' if that same sender has a payload in waiting in 'early_senders', ensuring that we process all payloads through AddBatch() before calling CloseSender().

This is unless the sender got closed or cancelled and doesn't care about the previous TransmitData() payloads, presumably because the query failed or got cancelled.

Given that, wouldn't it be better to call RemoveSender() first before calling EnqueueRowBatch()? We don't want to process data if the query is being cancelled anyway.


PS1, Line 290: senders
Assign an appropriate name like "timed_out_senders" or something similar.


PS1, Line 300: early_senders_
Assume the following case:

- This contained closed_senders that are erased from here since the receiver never got created in time.
- Next, the ExchangeNode finally comes around and creates the receiver, and calls GetBatch().

It would hang indefinitely in GetBatch() at data_arrival_cv_.wait() right? Since no data will ever arrive from the sender as the sender already sent the EndDataStream RPC.


Line 321:     // Wait for 10s
Add a brief comment stating that this is to check if the DataStreamMgr is being shutdown.


http://gerrit.cloudera.org:8080/#/c/7103/1/be/src/runtime/data-stream-mgr.h
File be/src/runtime/data-stream-mgr.h:

PS1, Line 81: will do one of three things
nit: would be nice to format them as bullet points.


PS1, Line 83: if the buffer is full
"if the batch queues are full"?
i.e. be more specific about what "buffer" it is.


PS1, Line 87: the sender
"the sender along with its payload" ?


PS1, Line 124: to
from?


Line 224:   /// has not yet prepared 'payload' is queued until it arrives, or is timed out. If the
nit: been prepared,


http://gerrit.cloudera.org:8080/#/c/7103/1/be/src/service/impala-server.h
File be/src/service/impala-server.h:

PS1, Line 255: void UpdateFilter
Leave a TODO stating that this should move to query-state.h/cc after IMPALA-3825 (if you agree).


-- 
To view, visit http://gerrit.cloudera.org:8080/7103
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b
Gerrit-PatchSet: 1
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sa...@cloudera.com>
Gerrit-HasComments: Yes