You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@impala.apache.org by "Henry Robinson (Code Review)" <ge...@cloudera.org> on 2017/02/03 01:24:01 UTC

[Impala-ASF-CR] IMPALA-4856: Port ImpalaInternalService to KRPC

Henry Robinson has uploaded a new change for review.

  http://gerrit.cloudera.org:8080/5888

Change subject: IMPALA-4856: Port ImpalaInternalService to KRPC
......................................................................

IMPALA-4856: Port ImpalaInternalService to KRPC

This patch ports the ImpalaInternalService to KRPC.

* ImpalaInternalService is split into two KRPC services. The first,
  ImpalaInternalService, deals with control messages for plan fragment
  instance execution, cancellation and reporting. The second,
  DataStreamService, handles large-payload RPCs for transmitting runtime
  filters and row batches between hosts. The separation allows us to
  dedicate resources to each service, rather than have them compete for
  the same thread pool and queue space.

* In the DataStreamService, all RPCs use 'native'
  protobuf. ImpalaInternalService RPCs remain wrappers of Thrift data
  structures.

* This patch adds support for asynchronous RPCs to the RpcMgr and Rpc
  classes. Previously, Impala used fixed size thread pools + synchronous
  RPCs to achieve some parallelism for 'broadcast' RPCs like filter
  propagation, or a dedicated per-sender+receiver pair thread on the
  sender side in the DataStreamSender case. In this patch, the
  PublishFilter(), CancelPlanFragment() and TransmitData() RPCs are all
  sent asynchronously using KRPC's thread pools.

* As a result, DataStreamSender no longer creates a
  thread-per-connection on the sender side. In this patch, the
  receiver-side still blocks if the receiver is unable to process a new
  row batch. A follow-on patch will change that to send notifications to
  the sender asynchronously, without blocking in the receiver threads.

* A large portion of this patch is the replacement of TRowBatch with its
  Protobuf equivalent, RowBatchPb. The replacement is a literal port of
  the data structure, and row-batch-test, row-batch-list-test and
  row-batch-serialize-benchmark continue to execute without logic
  changes.

* This patch also begins a clean-up of how ImpalaServer instances are created (by
  removing CreateImpalaServer), and clarifying the relationship between
  ExecEnv and ImpalaServer. ImpalaServer now follows the standard
  construct->Init()->Start()->Join() lifecycle that we use for other
  services.

TESTING
-------

* New tests added to rpc-mgr-test.

TO DO
-----

* Re-enable throughput and latency measurements per data-stream sender
  when that information is exposed from KRPC (KUDU-1738).

Change-Id: I95229290566a8ccffd80ed2d74c1c57cf1479238
---
M .clang-format
M be/generated-sources/gen-cpp/CMakeLists.txt
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/benchmarks/expr-benchmark.cc
M be/src/benchmarks/row-batch-serialize-benchmark.cc
M be/src/common/status.cc
M be/src/common/status.h
M be/src/rpc/CMakeLists.txt
M be/src/rpc/common.proto
M be/src/rpc/rpc-mgr-test.cc
M be/src/rpc/rpc-mgr.h
M be/src/rpc/rpc.h
M be/src/rpc/thrift-server-test.cc
D be/src/runtime/backend-client.h
M be/src/runtime/client-cache-types.h
M be/src/runtime/client-cache.cc
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-mgr.cc
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.cc
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/data-stream-sender.cc
M be/src/runtime/data-stream-sender.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/descriptors.cc
M be/src/runtime/descriptors.h
M be/src/runtime/exec-env.cc
M be/src/runtime/exec-env.h
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/plan-fragment-executor.h
M be/src/runtime/row-batch-serialize-test.cc
M be/src/runtime/row-batch.cc
M be/src/runtime/row-batch.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-state.cc
M be/src/runtime/runtime-state.h
M be/src/scheduling/request-pool-service.h
M be/src/scheduling/simple-scheduler-test-util.h
M be/src/service/CMakeLists.txt
M be/src/service/fe-support.cc
M be/src/service/frontend.h
M be/src/service/impala-internal-service.cc
M be/src/service/impala-internal-service.h
M be/src/service/impala-server.cc
M be/src/service/impala-server.h
A be/src/service/impala_internal_service.proto
M be/src/service/impalad-main.cc
M be/src/testutil/in-process-servers.cc
M be/src/testutil/in-process-servers.h
M be/src/util/bloom-filter-test.cc
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/hdfs-util-test.cc
M common/thrift/CMakeLists.txt
M common/thrift/ImpalaInternalService.thrift
M common/thrift/Results.thrift
59 files changed, 1,400 insertions(+), 1,264 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/88/5888/1
-- 
To view, visit http://gerrit.cloudera.org:8080/5888
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I95229290566a8ccffd80ed2d74c1c57cf1479238
Gerrit-PatchSet: 1
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <he...@cloudera.com>

[Impala-ASF-CR] IMPALA-4856: Port ImpalaInternalService to KRPC

Posted by "Henry Robinson (Code Review)" <ge...@cloudera.org>.
Henry Robinson has uploaded a new patch set (#5).

Change subject: IMPALA-4856: Port ImpalaInternalService to KRPC
......................................................................

IMPALA-4856: Port ImpalaInternalService to KRPC

This patch ports the ImpalaInternalService to KRPC.

* ImpalaInternalService is split into two KRPC services. The first,
  ImpalaInternalService, deals with control messages for plan fragment
  instance execution, cancellation and reporting. The second,
  DataStreamService, handles large-payload RPCs for transmitting runtime
  filters and row batches between hosts. The separation allows us to
  dedicate resources to each service, rather than have them compete for
  the same thread pool and queue space.

* In the DataStreamService, all RPCs use 'native'
  protobuf. ImpalaInternalService RPCs remain wrappers of Thrift data
  structures.

* This patch adds support for asynchronous RPCs to the RpcMgr and Rpc
  classes. Previously, Impala used fixed size thread pools + synchronous
  RPCs to achieve some parallelism for 'broadcast' RPCs like filter
  propagation, or a dedicated per-sender+receiver pair thread on the
  sender side in the DataStreamSender case. In this patch, the
  PublishFilter(), CancelPlanFragment() and TransmitData() RPCs are all
  sent asynchronously using KRPC's thread pools.

* The TransmitData() protocol has changed to adapt to asynchronous RPCs,
  and to more properly handle the case where receiver queues are
  full. The full details are in data-stream-mgr.h.

* As a result, DataStreamSender no longer creates a
  thread-per-connection on the sender side.

* Both tuple transmission and runtime filter publication use sidecars to
  minimise the number of copies and serialization steps required.

* A large portion of this patch is the replacement of TRowBatch with its
  Protobuf equivalent, RowBatchPb. The replacement is a literal port of
  the data structure, and row-batch-test, row-batch-list-test and
  row-batch-serialize-benchmark continue to execute without logic
  changes.

* Simplify FindRecvr() logic in DataStreamManager. No-longer need to
  handle blocking sender-side, so no need for complex promise-based
  machinery. Instead, all senders with no receiver are added to a
  per-receiver list, which is processed when the receiver arrives. If it
  does not arrive promptly, the DataStreamManager cleans them up after
  FLAGS_datastream_sender_timeout_ms.

* This patch also begins a clean-up of how ImpalaServer instances are created (by
  removing CreateImpalaServer), and clarifying the relationship between
  ExecEnv and ImpalaServer. ImpalaServer now follows the standard
  construct->Init()->Start()->Join() lifecycle that we use for other
  services.

TESTING
-------

* New tests added to rpc-mgr-test.

TO DO
-----

* Re-enable throughput and latency measurements per data-stream sender
  when that information is exposed from KRPC (KUDU-1738).

* TLS and Kerberos are still not supported by KRPC in this patch.

Change-Id: I95229290566a8ccffd80ed2d74c1c57cf1479238
---
M .clang-format
M CMakeLists.txt
M be/generated-sources/gen-cpp/CMakeLists.txt
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/benchmarks/expr-benchmark.cc
M be/src/benchmarks/row-batch-serialize-benchmark.cc
M be/src/common/init.cc
M be/src/common/status.cc
M be/src/common/status.h
M be/src/rpc/CMakeLists.txt
M be/src/rpc/common.proto
M be/src/rpc/rpc-mgr-test.cc
M be/src/rpc/rpc-mgr.h
M be/src/rpc/rpc.h
M be/src/rpc/thrift-server-test.cc
D be/src/runtime/backend-client.h
M be/src/runtime/client-cache-types.h
M be/src/runtime/client-cache.cc
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-mgr.cc
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.cc
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/data-stream-sender.cc
M be/src/runtime/data-stream-sender.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/exec-env.cc
M be/src/runtime/exec-env.h
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/plan-fragment-executor.h
M be/src/runtime/row-batch-serialize-test.cc
M be/src/runtime/row-batch.cc
M be/src/runtime/row-batch.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-state.cc
M be/src/runtime/runtime-state.h
M be/src/scheduling/request-pool-service.h
M be/src/scheduling/scheduler-test-util.h
M be/src/service/CMakeLists.txt
A be/src/service/data_stream_service.proto
A be/src/service/exec_control_service.proto
M be/src/service/fe-support.cc
M be/src/service/frontend.h
M be/src/service/impala-internal-service.cc
M be/src/service/impala-internal-service.h
M be/src/service/impala-server.cc
M be/src/service/impala-server.h
M be/src/service/impalad-main.cc
M be/src/testutil/fault-injection-util.h
M be/src/testutil/in-process-servers.cc
M be/src/testutil/in-process-servers.h
M be/src/util/bloom-filter-test.cc
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/hdfs-util-test.cc
M be/src/util/runtime-profile.cc
M be/src/util/runtime-profile.h
M common/thrift/CMakeLists.txt
M common/thrift/ImpalaInternalService.thrift
M common/thrift/Results.thrift
M common/thrift/generate_error_codes.py
M tests/custom_cluster/test_breakpad.py
M tests/custom_cluster/test_exchange_delays.py
M tests/custom_cluster/test_rpc_timeout.py
67 files changed, 2,365 insertions(+), 1,650 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/88/5888/5
-- 
To view, visit http://gerrit.cloudera.org:8080/5888
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I95229290566a8ccffd80ed2d74c1c57cf1479238
Gerrit-PatchSet: 5
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Anonymous Coward #168
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Marcel Kornacker <ma...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sa...@cloudera.com>

[Impala-ASF-CR] IMPALA-4856: Port ImpalaInternalService to KRPC

Posted by "Henry Robinson (Code Review)" <ge...@cloudera.org>.
Henry Robinson has abandoned this change.

Change subject: IMPALA-4856: Port ImpalaInternalService to KRPC
......................................................................


Abandoned

Superseded by https://gerrit.cloudera.org/#/c/7103/

-- 
To view, visit http://gerrit.cloudera.org:8080/5888
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: abandon
Gerrit-Change-Id: I95229290566a8ccffd80ed2d74c1c57cf1479238
Gerrit-PatchSet: 5
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Anonymous Coward #168
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Marcel Kornacker <ma...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sa...@cloudera.com>

[Impala-ASF-CR] IMPALA-4856: Port ImpalaInternalService to KRPC

Posted by "Henry Robinson (Code Review)" <ge...@cloudera.org>.
Henry Robinson has posted comments on this change.

Change subject: IMPALA-4856: Port ImpalaInternalService to KRPC
......................................................................


Patch Set 1:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/5888/1/be/src/service/impala-internal-service.cc
File be/src/service/impala-internal-service.cc:

Line 66: 
> sync callback ? AddData func may block some time that will cause  many work
You are correct, but those changes are to come in a subsequent patch which changes the protocol a bit.


-- 
To view, visit http://gerrit.cloudera.org:8080/5888
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I95229290566a8ccffd80ed2d74c1c57cf1479238
Gerrit-PatchSet: 1
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Anonymous Coward #168
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-4856: Port ImpalaInternalService to KRPC

Posted by "Anonymous Coward (Code Review)" <ge...@cloudera.org>.
Anonymous Coward #168 has posted comments on this change.

Change subject: IMPALA-4856: Port ImpalaInternalService to KRPC
......................................................................


Patch Set 1:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/5888/1/be/src/service/impala-internal-service.cc
File be/src/service/impala-internal-service.cc:

Line 66: 
sync callback ? AddData func may block some time that will cause  many work thread block here so bad . I think should move this task to Business processing thread, and it decided when to call callback.
Like fbthrift ThriftServer side task processing mechanism?Execute the callback function at the appropriate time


-- 
To view, visit http://gerrit.cloudera.org:8080/5888
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I95229290566a8ccffd80ed2d74c1c57cf1479238
Gerrit-PatchSet: 1
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Anonymous Coward #168
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-4856: Port ImpalaInternalService to KRPC

Posted by "Marcel Kornacker (Code Review)" <ge...@cloudera.org>.
Marcel Kornacker has posted comments on this change.

Change subject: IMPALA-4856: Port ImpalaInternalService to KRPC
......................................................................


Patch Set 3:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/runtime/data-stream-mgr.h
File be/src/runtime/data-stream-mgr.h:

Line 245:   void AddData(const TUniqueId& fragment_instance_id, TransmitDataCtx&& payload);
> don't use an rvalue param here. rather, make ownership (and change of owner
in other words, pass in a pointer and stipulate that AddData owns it.


-- 
To view, visit http://gerrit.cloudera.org:8080/5888
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I95229290566a8ccffd80ed2d74c1c57cf1479238
Gerrit-PatchSet: 3
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Anonymous Coward #168
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Marcel Kornacker <ma...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sa...@cloudera.com>
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-4856: Port ImpalaInternalService to KRPC

Posted by "Marcel Kornacker (Code Review)" <ge...@cloudera.org>.
Marcel Kornacker has posted comments on this change.

Change subject: IMPALA-4856: Port ImpalaInternalService to KRPC
......................................................................


Patch Set 5:

(6 comments)

http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/runtime/data-stream-mgr.h
File be/src/runtime/data-stream-mgr.h:

Line 77: /// After the first batch has been received, the sender continues to send batches, one at
> No, for a couple of reasons:
follow-on questions:
- should the queue be sized in terms of bytes or rows/row batches? the latter would probably lead to steadier behavior
- regarding point 2: with correct flow control, i don't see how the receiver stalling would still allow the senders to fill up the queue.

let's talk in person tomorrow, rather than a back & forth over comments.


Line 84: /// notified when the batch has been processed, or added to the batch queue,
> I'm not sure which part you're referring to - but yes, in general this is h
let's discuss tomorrow, i'm wondering if this feature can be avoided.


Line 131: /// is reached, the RPC will be considered failed, and the channel and sender will be
> In the queued batch case, we could look to see if the queue had any more ca
i think we should size the queue in terms of batches, not total bytes. the sender itself can keep track of total bytes.


Line 245:   /// If the stream would exceed its buffering limit as a result of queuing this batch,
> I feel that rvalues do make ownership explicit. You can't pass a non-tempor
i think this is inferior to passing a pointer to the context and stipulating that the caller becomes the owner. also, move() is still more expensive than passing a pointer, and also more opaque (you don't know what's getting copied, unless you know the details of every field in the struct).

i really don't see any upside to a move() here.


http://gerrit.cloudera.org:8080/#/c/5888/5/be/src/runtime/data-stream-mgr.h
File be/src/runtime/data-stream-mgr.h:

Line 124: /// execution. Senders do not block in TransmitData() RPCs, and may be cancelled at any
what do you mean, they don't block in transmitdata rpcs? they have to wait for a response, no?


Line 133: /// stream to continue correct operation.
where did the 120s come from again?


-- 
To view, visit http://gerrit.cloudera.org:8080/5888
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I95229290566a8ccffd80ed2d74c1c57cf1479238
Gerrit-PatchSet: 5
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Anonymous Coward #168
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Marcel Kornacker <ma...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sa...@cloudera.com>
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-4856: Port ImpalaInternalService to KRPC

Posted by "Henry Robinson (Code Review)" <ge...@cloudera.org>.
Henry Robinson has uploaded a new patch set (#3).

Change subject: IMPALA-4856: Port ImpalaInternalService to KRPC
......................................................................

IMPALA-4856: Port ImpalaInternalService to KRPC

This patch ports the ImpalaInternalService to KRPC.

* ImpalaInternalService is split into two KRPC services. The first,
  ImpalaInternalService, deals with control messages for plan fragment
  instance execution, cancellation and reporting. The second,
  DataStreamService, handles large-payload RPCs for transmitting runtime
  filters and row batches between hosts. The separation allows us to
  dedicate resources to each service, rather than have them compete for
  the same thread pool and queue space.

* In the DataStreamService, all RPCs use 'native'
  protobuf. ImpalaInternalService RPCs remain wrappers of Thrift data
  structures.

* This patch adds support for asynchronous RPCs to the RpcMgr and Rpc
  classes. Previously, Impala used fixed size thread pools + synchronous
  RPCs to achieve some parallelism for 'broadcast' RPCs like filter
  propagation, or a dedicated per-sender+receiver pair thread on the
  sender side in the DataStreamSender case. In this patch, the
  PublishFilter(), CancelPlanFragment() and TransmitData() RPCs are all
  sent asynchronously using KRPC's thread pools.

* The TransmitData() protocol has changed to adapt to asynchronous RPCs,
  and to more properly handle the case where receiver queues are
  full. The full details are in data-stream-mgr.h.

* As a result, DataStreamSender no longer creates a
  thread-per-connection on the sender side.

* Both tuple transmission and runtime filter publication use sidecars to
  minimise the number of copies and serialization steps required.

* A large portion of this patch is the replacement of TRowBatch with its
  Protobuf equivalent, RowBatchPb. The replacement is a literal port of
  the data structure, and row-batch-test, row-batch-list-test and
  row-batch-serialize-benchmark continue to execute without logic
  changes.

* Simplify FindRecvr() logic in DataStreamManager. No-longer need to
  handle blocking sender-side, so no need for complex promise-based
  machinery. Instead, all senders with no receiver are added to a
  per-receiver list, which is processed when the receiver arrives. If it
  does not arrive promptly, the DataStreamManager cleans them up after
  FLAGS_datastream_sender_timeout_ms.

* This patch also begins a clean-up of how ImpalaServer instances are created (by
  removing CreateImpalaServer), and clarifying the relationship between
  ExecEnv and ImpalaServer. ImpalaServer now follows the standard
  construct->Init()->Start()->Join() lifecycle that we use for other
  services.

TESTING
-------

* New tests added to rpc-mgr-test.

TO DO
-----

* Re-enable throughput and latency measurements per data-stream sender
  when that information is exposed from KRPC (KUDU-1738).

* TLS and Kerberos are still not supported by KRPC in this patch.

Change-Id: I95229290566a8ccffd80ed2d74c1c57cf1479238
---
M .clang-format
M CMakeLists.txt
M be/generated-sources/gen-cpp/CMakeLists.txt
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/benchmarks/expr-benchmark.cc
M be/src/benchmarks/row-batch-serialize-benchmark.cc
M be/src/common/init.cc
M be/src/common/status.cc
M be/src/common/status.h
M be/src/rpc/CMakeLists.txt
M be/src/rpc/common.proto
M be/src/rpc/rpc-mgr-test.cc
M be/src/rpc/rpc-mgr.h
M be/src/rpc/rpc.h
M be/src/rpc/thrift-server-test.cc
D be/src/runtime/backend-client.h
M be/src/runtime/client-cache-types.h
M be/src/runtime/client-cache.cc
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-mgr.cc
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.cc
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/data-stream-sender.cc
M be/src/runtime/data-stream-sender.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/descriptors.cc
M be/src/runtime/descriptors.h
M be/src/runtime/exec-env.cc
M be/src/runtime/exec-env.h
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/plan-fragment-executor.h
M be/src/runtime/row-batch-serialize-test.cc
M be/src/runtime/row-batch.cc
M be/src/runtime/row-batch.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-state.cc
M be/src/runtime/runtime-state.h
M be/src/scheduling/request-pool-service.h
M be/src/scheduling/scheduler-test-util.h
M be/src/service/CMakeLists.txt
M be/src/service/fe-support.cc
M be/src/service/frontend.h
M be/src/service/impala-internal-service.cc
M be/src/service/impala-internal-service.h
M be/src/service/impala-server.cc
M be/src/service/impala-server.h
A be/src/service/impala_internal_service.proto
M be/src/service/impalad-main.cc
M be/src/testutil/fault-injection-util.h
M be/src/testutil/in-process-servers.cc
M be/src/testutil/in-process-servers.h
M be/src/util/bloom-filter-test.cc
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/hdfs-util-test.cc
M be/src/util/runtime-profile.cc
M be/src/util/runtime-profile.h
M common/thrift/CMakeLists.txt
M common/thrift/ImpalaInternalService.thrift
M common/thrift/Results.thrift
M common/thrift/generate_error_codes.py
M tests/custom_cluster/test_breakpad.py
M tests/custom_cluster/test_exchange_delays.py
M tests/custom_cluster/test_rpc_timeout.py
68 files changed, 2,327 insertions(+), 1,652 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/88/5888/3
-- 
To view, visit http://gerrit.cloudera.org:8080/5888
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I95229290566a8ccffd80ed2d74c1c57cf1479238
Gerrit-PatchSet: 3
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Anonymous Coward #168
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>

[Impala-ASF-CR] IMPALA-4856: Port ImpalaInternalService to KRPC

Posted by "Sailesh Mukil (Code Review)" <ge...@cloudera.org>.
Sailesh Mukil has posted comments on this change.

Change subject: IMPALA-4856: Port ImpalaInternalService to KRPC
......................................................................


Patch Set 3:

(14 comments)

Had a first look.

http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/runtime/data-stream-mgr.cc
File be/src/runtime/data-stream-mgr.cc:

PS3, Line 327: VLOG_QUERY << "DataStreamMgr maintenance tasks complete. Took: "
             :                << PrettyPrinter::Print(MonotonicMillis() - start, TUnit::TIME_MS);
Do you think it's worth printing what maintenance tasks were done in this iteration? And maybe don't print anything if no work was done. Might help with debugging. Feel free to ignore if you disagree.

Also, wondering if VLOG_QUERY is the right log level to print this. Wouldn't this cause a lot of spam?


http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/runtime/data-stream-mgr.h
File be/src/runtime/data-stream-mgr.h:

PS3, Line 135: idea
I would rather use the word 'assumption' here.


PS3, Line 212: TRANSMIT_DATA_TIMEOUT_SECONDS
I think there needs to be some comment clearly distinguishing between this timeout and FLAGS_datastream_sender_timeout_ms.


http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/runtime/data-stream-recvr.cc
File be/src/runtime/data-stream-recvr.cc:

PS3, Line 129: pending_senders_
Maybe this could be future work, but I foresee a need to cap this at some number (which could be a function of the number of nodes) after which it would be beneficial to just fail the query.


PS3, Line 137: SpinLock
Isn't this a rather large amount of work to have under a spinlock?
Also, we would expect this lock to be more than fairly contended.


PS3, Line 191: // num_remaining_senders_ could be 0 because an AddBatch() can arrive *after* a
             :     // EndDataStream() RPC for the same sender, due to asynchrony on the sender side (the
             :     // sender gets closed or cancelled, but doesn't wait for the oustanding TransmitData()
             :     // to complete before trying to close the channel).
If this is the only case where num_remaining_senders_ can be 0, then is there any point in responding at all?


http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/runtime/data-stream-recvr.h
File be/src/runtime/data-stream-recvr.h:

PS3, Line 174: good_bytes_received_counter_
bytes_accepted_counter_ ?


http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/service/impala-internal-service.cc
File be/src/service/impala-internal-service.cc:

Line 80:     ExecEnv::GetInstance()->stream_mgr()->AddData(finst_id, move(payload));
This is a tad bit confusing. Why is there not a necessity to have a RespondSuccess() for this case?
I do see that there is a payload.context->RespondSuccess() inside AddData(), but that's only on the error case.

Also, ideally, for the sake of consistency, I would prefer that the error status be returned here and then see that the RPC is responded to from here. Unless you see a good reason for it not to be that way.


PS3, Line 99: // TODO: Check return
What he said.


Line 118:     context->GetInboundSidecar(filter.header.directory_sidecar_idx(), &filter.directory);
Same here, check return status.


Line 155:   context->RespondSuccess();
No need to return status.SetTStatus(&return_val) ?
I see that we check it in ReportStatusCb().

This changes behavior a bit. It looks like currently, if a ReportExecStatus() RPC fails, we fail the query. That won't happen if the sidecar deserialization fails.


http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/service/impala-server.cc
File be/src/service/impala-server.cc:

PS3, Line 1925: move
Include what you use? <utility>


http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/service/impala-server.h
File be/src/service/impala-server.h:

PS3, Line 123: Shutdown
Who calls this?


http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/service/impalad-main.cc
File be/src/service/impalad-main.cc:

PS3, Line 79: exec_env->Init();
Need to check returned status and fail if necessary.


-- 
To view, visit http://gerrit.cloudera.org:8080/5888
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I95229290566a8ccffd80ed2d74c1c57cf1479238
Gerrit-PatchSet: 3
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Anonymous Coward #168
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Marcel Kornacker <ma...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sa...@cloudera.com>
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-4856: Port ImpalaInternalService to KRPC

Posted by "Henry Robinson (Code Review)" <ge...@cloudera.org>.
Henry Robinson has posted comments on this change.

Change subject: IMPALA-4856: Port ImpalaInternalService to KRPC
......................................................................


Patch Set 3:

(30 comments)

http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/runtime/data-stream-mgr.cc
File be/src/runtime/data-stream-mgr.cc:

PS3, Line 327: VLOG_QUERY << "DataStreamMgr maintenance tasks complete. Took: "
             :                << PrettyPrinter::Print(MonotonicMillis() - start, TUnit::TIME_MS);
> Do you think it's worth printing what maintenance tasks were done in this i
Removed for now.


http://gerrit.cloudera.org:8080/#/c/5888/2/be/src/runtime/data-stream-mgr.h
File be/src/runtime/data-stream-mgr.h:

PS2, Line 59: In the first phase the sender initiates 
removed


http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/runtime/data-stream-mgr.h
File be/src/runtime/data-stream-mgr.h:

Line 60: /// first batch. Since the sender may start sending before the receiver is ready, the data
> we could relatively easily remove that restriction, the execrpc changes are
Any change I can think of would be substantial (two-phase start-up protocol) or bug-prone (waiting).


Line 77: /// fixed-size buffer for later processing, or discard the batch if the buffer is full. In
> how can the is-full case come up? shouldn't flow control keep the sender fr
No, for a couple of reasons:

1. Flow control doesn't kick in until the first round of batches have been sent, so all senders try to send batches at once. 

2. In the steady-state case with a receiver that's faster than the senders, there'll be no queuing. But if the receiver suddenly stalls there can be many senders still preparing a new batch, and together they can fill up the queue. 

2. The queue is limited by the byte size of the batches. The next batch might be really large and overflow the queue, and we can't predict that in general.


Line 84: /// queue. An error code is returned which causes the sender to retry the previous batch.
> sounds complicated. is that really necessary?
I'm not sure which part you're referring to - but yes, in general this is how the flow control is implemented. The pending sender list replaces the set of blocked threads from Thrift, and is lighter weight because it gives us the ability to discard the row batch payload.

The error code is used to signal to the RPC layer on the sender side that the RPC should be retried. This has the benefit of making the retry transparent to the caller code.


Line 113: /// time-out and cancel itself. However, it is usual that the coordinator will initiate
> i guess that extra complication is necessary because the non-existence of a
Exactly.


Line 124: /// The sender has a default timeout of 2 minutes for TransmitData() calls. If the timeout
> why so long?
The former case is detected in almost all cases. If the receiver has shown up, and been closed, the sender will find that in the closed stream cache.


Line 131: /// immediately. Both of these cases are designed so that the sender can prepare a new
> in the queued case, it seems like the response should go out when the queue
In the queued batch case, we could look to see if the queue had any more capacity before responding to the sender. However, it's hard to know how much capacity a sender needs for its next batch (as batches with strings can vary in size a lot). This strategy is optimistic - if the receiver is able to accept a batch, we presume that it can accept the next one as well. In my tests, this worked better than pessimistically pausing senders until we knew for sure there was room for their particular batch.


PS3, Line 135: idea
> I would rather use the word 'assumption' here.
Done


Line 137: /// notification should not exceed 60s and so the sender will not time out.
> that also sounds complicated.
The sender needs a timeout, after which it will fail, and so the receiver has to try to respond within that timeout. That's all that's going on here - picking a timeout and then picking a response time, modulo standard distributed systems issues, that is very likely to be less than that timeout. This avoids false negatives.


PS3, Line 212: TRANSMIT_DATA_TIMEOUT_SECONDS
> I think there needs to be some comment clearly distinguishing between this 
I removed this because it was so similar to datastream_sender_timeout_ms that it made sense to just use the flag for both the initial timeout, and the row-batch processing timeout.


Line 245:   void AddData(const TUniqueId& fragment_instance_id, TransmitDataCtx&& payload);
> in other words, pass in a pointer and stipulate that AddData owns it.
I feel that rvalues do make ownership explicit. You can't pass a non-temporary rvalue without move(X), which makes it really clear.

I think it's better to make it clear from the code (you cannot pass a TransmitDataCtx without relinquishing ownership) than it is to use the comments to convey a convention.


http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/runtime/data-stream-recvr.cc
File be/src/runtime/data-stream-recvr.cc:

PS3, Line 129: pending_senders_
> Maybe this could be future work, but I foresee a need to cap this at some n
Hm - why do you think failing the query's a good idea? If the receiver is slow relative to the sender, this queue will grow, but that's not an error.


PS3, Line 137: SpinLock
> Isn't this a rather large amount of work to have under a spinlock?
Although it's quite a few lines, I don't think it's that much work. What operation looks expensive to you?


PS3, Line 191: // num_remaining_senders_ could be 0 because an AddBatch() can arrive *after* a
             :     // EndDataStream() RPC for the same sender, due to asynchrony on the sender side (the
             :     // sender gets closed or cancelled, but doesn't wait for the oustanding TransmitData()
             :     // to complete before trying to close the channel).
> If this is the only case where num_remaining_senders_ can be 0, then is the
We have to clean up the RPC state and responding seems a good way to do that.


http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/runtime/data-stream-recvr.h
File be/src/runtime/data-stream-recvr.h:

PS3, Line 174: good_bytes_received_counter_
> bytes_accepted_counter_ ?
Done


http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/runtime/descriptors.h
File be/src/runtime/descriptors.h:

Line 546:   /// Serialize to the row_tuples field of a RowBatchPb.
> something called ToProto should materialize a message that corresponds to t
Done


http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/service/impala-internal-service.cc
File be/src/service/impala-internal-service.cc:

Line 80:     ExecEnv::GetInstance()->stream_mgr()->AddData(finst_id, move(payload));
> This is a tad bit confusing. Why is there not a necessity to have a Respond
AddData() will always respond to the RPC, but may do so asynchronously, so we can't use a return value here. 

If AddData() is not called, we have to respond to the RPC right here to make sure it doesn't hang on the client. I added some comments.


PS3, Line 99: // TODO: Check return
> What he said.
Done


Line 118:     context->GetInboundSidecar(filter.header.directory_sidecar_idx(), &filter.directory);
> Same here, check return status.
Done


Line 155:   context->RespondSuccess();
> No need to return status.SetTStatus(&return_val) ?
Good catch, done.


http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/service/impala-server.cc
File be/src/service/impala-server.cc:

PS3, Line 1925: move
> Include what you use? <utility>
Done


http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/service/impala_internal_service.proto
File be/src/service/impala_internal_service.proto:

Line 1: // Licensed to the Apache Software Foundation (ASF) under one
> let's move all .proto files into common/proto, that'll make it a lot easier
The tooling is set up so that proto files fit more neatly in the existing source directories, and during development I've found it convenient to have the generated files in the same directory as other source files. I'd rather keep them in the source dirs - finding them is not hard.


Line 32:   // Tuples for all rows
> incomprehensible comment.
Do you mean like this:

  message TupleId {
    int32 id = 1;
  }
?

If so I don't quite see the benefit vs the extra verbosity.


Line 46:   // Of type CatalogObjects.THdfsCompression (TODO(KRPC): native enum)
> why not use the enum?
We use THdfsCompression here because that's used elsewhere in the code where it's harder to change the serialization format (i.e. in Java). I could set up a mirror compression type in protobufs that's used for this field specifically, but that would be brittle (got to make sure the translation is done accurately from thrift<->proto and that no cases are missed). I think it's better to wait until we can use a proto enum everywhere.


Line 50: message TransmitDataRequestPb {
> you left out the service version (example: ImpalaInternalServiceVersion in 
I think there are some details that are underspecified in our current implementation that need some discussion - for example, how should version mismatches be indicated to the client? KRPC has an out-of-band error mechanism that we can use. 

Alternatively, we can add versions to response objects, with the convention that if the replied version != the requested version, no fields will be set and the RPC should fail. 

In Thrift only the request objects have a version field.

What should we do with messages that aren't parameters? Do they have version fields? Otherwise, how should they be versioned? (Say we want to add a field to StatusPb). Do we have StatusPbV1 etc?

I think this warrants a bit more discussion. I've changed all the fields to 'optional' (that's the only possibility in protobuf v3, out of interest). I think that gives us some future-proofing while we get the details nailed down, since the absence of a protocol marker can be interpreted as V0.


Line 115: service ExecControlService {
> why not put the services in separate .proto files
Done


Line 131: service DataStreamService {
> would it make sense to have separate patches for the two services? it feels
I've started this, but it's a bit time-consuming moving thousands of lines of changes between commits. For now, please review the data stream changes, and eventually the control svc changes will move into a different patch.


Line 140:   // Called by the coordinator to deliver global runtime filters to fragment
> i consider the filters to be control structures. why wouldn't they be in ex
I don't think they are control structures. They have large payloads and contain tuple data.


http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/service/impalad-main.cc
File be/src/service/impalad-main.cc:

PS3, Line 79: exec_env->Init();
> Need to check returned status and fail if necessary.
Done


-- 
To view, visit http://gerrit.cloudera.org:8080/5888
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I95229290566a8ccffd80ed2d74c1c57cf1479238
Gerrit-PatchSet: 3
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Anonymous Coward #168
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Marcel Kornacker <ma...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sa...@cloudera.com>
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-4856: Port ImpalaInternalService to KRPC

Posted by "Marcel Kornacker (Code Review)" <ge...@cloudera.org>.
Marcel Kornacker has posted comments on this change.

Change subject: IMPALA-4856: Port ImpalaInternalService to KRPC
......................................................................


Patch Set 3:

(17 comments)

first look

http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/runtime/data-stream-mgr.h
File be/src/runtime/data-stream-mgr.h:

Line 60: /// first batch. Since the sender may start sending before the receiver is ready, the data
we could relatively easily remove that restriction, the execrpc changes are going to reduce the startup time.


Line 77: /// fixed-size buffer for later processing, or discard the batch if the buffer is full. In
how can the is-full case come up? shouldn't flow control keep the sender from getting to that point?


Line 84: /// queue. An error code is returned which causes the sender to retry the previous batch.
sounds complicated. is that really necessary?


Line 113: /// time-out and cancel itself. However, it is usual that the coordinator will initiate
i guess that extra complication is necessary because the non-existence of an in-flight query id doesn't mean it won't show up later.


Line 124: /// The sender has a default timeout of 2 minutes for TransmitData() calls. If the timeout
why so long?

can we not distinguish "the rpc failed because the receiver went away" and "the data stream is paused because the client hasn't called fetch in a while"?


Line 131: /// immediately. Both of these cases are designed so that the sender can prepare a new
in the queued case, it seems like the response should go out when the queue length drops below a limit, no?


Line 137: /// notification should not exceed 60s and so the sender will not time out.
that also sounds complicated.


Line 228:   /// Adds a row batch to the recvr identified by fragment_instance_id/dest_node_id if the
there is no dest_node_id, sender_id.


Line 245:   void AddData(const TUniqueId& fragment_instance_id, TransmitDataCtx&& payload);
don't use an rvalue param here. rather, make ownership (and change of ownership) explicit.


http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/runtime/descriptors.h
File be/src/runtime/descriptors.h:

Line 546:   /// Serialize to the row_tuples field of a RowBatchPb.
something called ToProto should materialize a message that corresponds to this class. that's not the case here. i don't see why RowBatch::ToProto wouldn't do this itself.


http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/service/impala_internal_service.proto
File be/src/service/impala_internal_service.proto:

Line 1: // Licensed to the Apache Software Foundation (ASF) under one
let's move all .proto files into common/proto, that'll make it a lot easier to find them.


Line 32:   // Tuples for all rows
incomprehensible comment.

since protos don't allow typedefs, we should probably define messages for all the ids, that makes it clear in the code what it's meant to represent.


Line 46:   // Of type CatalogObjects.THdfsCompression (TODO(KRPC): native enum)
why not use the enum?


Line 50: message TransmitDataRequestPb {
you left out the service version (example: ImpalaInternalServiceVersion in ImpalaInternalService.thrift).

for all params proto:
- must include the service version
- all fields are optional
- the comment needs to indicate whether it's optional or required in v1

for all result protos: the latter two points apply as well


Line 115: service ExecControlService {
why not put the services in separate .proto files


Line 131: service DataStreamService {
would it make sense to have separate patches for the two services? it feels like the data stream service is pretty much separate from the control service, and in particular it doesn't conflict with the ongoing coordinator/"control" changes.


Line 140:   // Called by the coordinator to deliver global runtime filters to fragment
i consider the filters to be control structures. why wouldn't they be in execcontrolservice?


-- 
To view, visit http://gerrit.cloudera.org:8080/5888
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I95229290566a8ccffd80ed2d74c1c57cf1479238
Gerrit-PatchSet: 3
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Anonymous Coward #168
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Marcel Kornacker <ma...@cloudera.com>
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-4856: Port ImpalaInternalService to KRPC

Posted by "Henry Robinson (Code Review)" <ge...@cloudera.org>.
Henry Robinson has uploaded a new patch set (#2).

Change subject: IMPALA-4856: Port ImpalaInternalService to KRPC
......................................................................

IMPALA-4856: Port ImpalaInternalService to KRPC

This patch ports the ImpalaInternalService to KRPC.

* ImpalaInternalService is split into two KRPC services. The first,
  ImpalaInternalService, deals with control messages for plan fragment
  instance execution, cancellation and reporting. The second,
  DataStreamService, handles large-payload RPCs for transmitting runtime
  filters and row batches between hosts. The separation allows us to
  dedicate resources to each service, rather than have them compete for
  the same thread pool and queue space.

* In the DataStreamService, all RPCs use 'native'
  protobuf. ImpalaInternalService RPCs remain wrappers of Thrift data
  structures.

* This patch adds support for asynchronous RPCs to the RpcMgr and Rpc
  classes. Previously, Impala used fixed size thread pools + synchronous
  RPCs to achieve some parallelism for 'broadcast' RPCs like filter
  propagation, or a dedicated per-sender+receiver pair thread on the
  sender side in the DataStreamSender case. In this patch, the
  PublishFilter(), CancelPlanFragment() and TransmitData() RPCs are all
  sent asynchronously using KRPC's thread pools.

* The TransmitData() protocol has changed to adapt to asynchronous RPCs,
  and to more properly handle the case where receiver queues are
  full. The full details are in data-stream-mgr.h.

* As a result, DataStreamSender no longer creates a
  thread-per-connection on the sender side.

* Both tuple transmission and runtime filter publication use sidecars to
  minimise the number of copies and serialization steps required.

* A large portion of this patch is the replacement of TRowBatch with its
  Protobuf equivalent, RowBatchPb. The replacement is a literal port of
  the data structure, and row-batch-test, row-batch-list-test and
  row-batch-serialize-benchmark continue to execute without logic
  changes.

* Simplify FindRecvr() logic in DataStreamManager. No-longer need to
  handle blocking sender-side, so no need for complex promise-based
  machinery. Instead, all senders with no receiver are added to a
  per-receiver list, which is processed when the receiver arrives. If it
  does not arrive promptly, the DataStreamManager cleans them up after
  FLAGS_datastream_sender_timeout_ms.

* This patch also begins a clean-up of how ImpalaServer instances are created (by
  removing CreateImpalaServer), and clarifying the relationship between
  ExecEnv and ImpalaServer. ImpalaServer now follows the standard
  construct->Init()->Start()->Join() lifecycle that we use for other
  services.

TESTING
-------

* New tests added to rpc-mgr-test.

TO DO
-----

* Re-enable throughput and latency measurements per data-stream sender
  when that information is exposed from KRPC (KUDU-1738).

* TLS and Kerberos are still not supported by KRPC in this patch.

Change-Id: I95229290566a8ccffd80ed2d74c1c57cf1479238
---
M .clang-format
M CMakeLists.txt
M be/generated-sources/gen-cpp/CMakeLists.txt
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/benchmarks/expr-benchmark.cc
M be/src/benchmarks/row-batch-serialize-benchmark.cc
M be/src/common/init.cc
M be/src/common/status.cc
M be/src/common/status.h
M be/src/rpc/CMakeLists.txt
M be/src/rpc/common.proto
M be/src/rpc/rpc-mgr-test.cc
M be/src/rpc/rpc-mgr.h
M be/src/rpc/rpc.h
M be/src/rpc/thrift-server-test.cc
D be/src/runtime/backend-client.h
M be/src/runtime/client-cache-types.h
M be/src/runtime/client-cache.cc
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-mgr.cc
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.cc
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/data-stream-sender.cc
M be/src/runtime/data-stream-sender.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/descriptors.cc
M be/src/runtime/descriptors.h
M be/src/runtime/exec-env.cc
M be/src/runtime/exec-env.h
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/plan-fragment-executor.h
M be/src/runtime/row-batch-serialize-test.cc
M be/src/runtime/row-batch.cc
M be/src/runtime/row-batch.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-state.cc
M be/src/runtime/runtime-state.h
M be/src/scheduling/request-pool-service.h
M be/src/scheduling/scheduler-test-util.h
M be/src/service/CMakeLists.txt
M be/src/service/fe-support.cc
M be/src/service/frontend.h
M be/src/service/impala-internal-service.cc
M be/src/service/impala-internal-service.h
M be/src/service/impala-server.cc
M be/src/service/impala-server.h
A be/src/service/impala_internal_service.proto
M be/src/service/impalad-main.cc
M be/src/testutil/in-process-servers.cc
M be/src/testutil/in-process-servers.h
M be/src/util/bloom-filter-test.cc
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/hdfs-util-test.cc
M be/src/util/runtime-profile.cc
M be/src/util/runtime-profile.h
M common/thrift/CMakeLists.txt
M common/thrift/ImpalaInternalService.thrift
M common/thrift/Results.thrift
M common/thrift/generate_error_codes.py
M tests/custom_cluster/test_breakpad.py
65 files changed, 2,267 insertions(+), 1,610 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/88/5888/2
-- 
To view, visit http://gerrit.cloudera.org:8080/5888
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I95229290566a8ccffd80ed2d74c1c57cf1479238
Gerrit-PatchSet: 2
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Anonymous Coward #168
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>

[Impala-ASF-CR] IMPALA-4856: Port ImpalaInternalService to KRPC

Posted by "Henry Robinson (Code Review)" <ge...@cloudera.org>.
Henry Robinson has uploaded a new patch set (#4).

Change subject: IMPALA-4856: Port ImpalaInternalService to KRPC
......................................................................

IMPALA-4856: Port ImpalaInternalService to KRPC

This patch ports the ImpalaInternalService to KRPC.

* ImpalaInternalService is split into two KRPC services. The first,
  ImpalaInternalService, deals with control messages for plan fragment
  instance execution, cancellation and reporting. The second,
  DataStreamService, handles large-payload RPCs for transmitting runtime
  filters and row batches between hosts. The separation allows us to
  dedicate resources to each service, rather than have them compete for
  the same thread pool and queue space.

* In the DataStreamService, all RPCs use 'native'
  protobuf. ImpalaInternalService RPCs remain wrappers of Thrift data
  structures.

* This patch adds support for asynchronous RPCs to the RpcMgr and Rpc
  classes. Previously, Impala used fixed size thread pools + synchronous
  RPCs to achieve some parallelism for 'broadcast' RPCs like filter
  propagation, or a dedicated per-sender+receiver pair thread on the
  sender side in the DataStreamSender case. In this patch, the
  PublishFilter(), CancelPlanFragment() and TransmitData() RPCs are all
  sent asynchronously using KRPC's thread pools.

* The TransmitData() protocol has changed to adapt to asynchronous RPCs,
  and to more properly handle the case where receiver queues are
  full. The full details are in data-stream-mgr.h.

* As a result, DataStreamSender no longer creates a
  thread-per-connection on the sender side.

* Both tuple transmission and runtime filter publication use sidecars to
  minimise the number of copies and serialization steps required.

* A large portion of this patch is the replacement of TRowBatch with its
  Protobuf equivalent, RowBatchPb. The replacement is a literal port of
  the data structure, and row-batch-test, row-batch-list-test and
  row-batch-serialize-benchmark continue to execute without logic
  changes.

* Simplify FindRecvr() logic in DataStreamManager. No-longer need to
  handle blocking sender-side, so no need for complex promise-based
  machinery. Instead, all senders with no receiver are added to a
  per-receiver list, which is processed when the receiver arrives. If it
  does not arrive promptly, the DataStreamManager cleans them up after
  FLAGS_datastream_sender_timeout_ms.

* This patch also begins a clean-up of how ImpalaServer instances are created (by
  removing CreateImpalaServer), and clarifying the relationship between
  ExecEnv and ImpalaServer. ImpalaServer now follows the standard
  construct->Init()->Start()->Join() lifecycle that we use for other
  services.

TESTING
-------

* New tests added to rpc-mgr-test.

TO DO
-----

* Re-enable throughput and latency measurements per data-stream sender
  when that information is exposed from KRPC (KUDU-1738).

* TLS and Kerberos are still not supported by KRPC in this patch.

Change-Id: I95229290566a8ccffd80ed2d74c1c57cf1479238
---
M .clang-format
M CMakeLists.txt
M be/generated-sources/gen-cpp/CMakeLists.txt
M be/src/benchmarks/bloom-filter-benchmark.cc
M be/src/benchmarks/expr-benchmark.cc
M be/src/benchmarks/row-batch-serialize-benchmark.cc
M be/src/common/init.cc
M be/src/common/status.cc
M be/src/common/status.h
M be/src/rpc/CMakeLists.txt
M be/src/rpc/common.proto
M be/src/rpc/rpc-mgr-test.cc
M be/src/rpc/rpc-mgr.h
M be/src/rpc/rpc.h
M be/src/rpc/thrift-server-test.cc
D be/src/runtime/backend-client.h
M be/src/runtime/client-cache-types.h
M be/src/runtime/client-cache.cc
M be/src/runtime/coordinator.cc
M be/src/runtime/coordinator.h
M be/src/runtime/data-stream-mgr.cc
M be/src/runtime/data-stream-mgr.h
M be/src/runtime/data-stream-recvr.cc
M be/src/runtime/data-stream-recvr.h
M be/src/runtime/data-stream-sender.cc
M be/src/runtime/data-stream-sender.h
M be/src/runtime/data-stream-test.cc
M be/src/runtime/exec-env.cc
M be/src/runtime/exec-env.h
M be/src/runtime/fragment-instance-state.cc
M be/src/runtime/fragment-instance-state.h
M be/src/runtime/plan-fragment-executor.h
M be/src/runtime/row-batch-serialize-test.cc
M be/src/runtime/row-batch.cc
M be/src/runtime/row-batch.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-state.cc
M be/src/runtime/runtime-state.h
M be/src/scheduling/request-pool-service.h
M be/src/scheduling/scheduler-test-util.h
M be/src/service/CMakeLists.txt
A be/src/service/data_stream_service.proto
A be/src/service/exec_control_service.proto
M be/src/service/fe-support.cc
M be/src/service/frontend.h
M be/src/service/impala-internal-service.cc
M be/src/service/impala-internal-service.h
M be/src/service/impala-server.cc
M be/src/service/impala-server.h
M be/src/service/impalad-main.cc
M be/src/testutil/fault-injection-util.h
M be/src/testutil/in-process-servers.cc
M be/src/testutil/in-process-servers.h
M be/src/util/bloom-filter-test.cc
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/hdfs-util-test.cc
M be/src/util/runtime-profile.cc
M be/src/util/runtime-profile.h
M common/thrift/CMakeLists.txt
M common/thrift/ImpalaInternalService.thrift
M common/thrift/Results.thrift
M common/thrift/generate_error_codes.py
M tests/custom_cluster/test_breakpad.py
M tests/custom_cluster/test_exchange_delays.py
M tests/custom_cluster/test_rpc_timeout.py
67 files changed, 2,353 insertions(+), 1,646 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/88/5888/4
-- 
To view, visit http://gerrit.cloudera.org:8080/5888
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I95229290566a8ccffd80ed2d74c1c57cf1479238
Gerrit-PatchSet: 4
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Anonymous Coward #168
Gerrit-Reviewer: Henry Robinson <he...@cloudera.com>
Gerrit-Reviewer: Marcel Kornacker <ma...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sa...@cloudera.com>