You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:58 UTC

[46/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from kudu@334ecafd

IMPALA-7006: Add KRPC folders from kudu@334ecafd

cp -a ~/checkout/kudu/src/kudu/{rpc,util,security} be/src/kudu/

Change-Id: I232db2b4ccf5df9aca87b21dea31bfb2735d1ab7
Reviewed-on: http://gerrit.cloudera.org:8080/10757
Reviewed-by: Lars Volker <lv...@cloudera.com>
Tested-by: Lars Volker <lv...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/fcf190c4
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/fcf190c4
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/fcf190c4

Branch: refs/heads/master
Commit: fcf190c4de1fcc291a5356634fd7cd12efa64852
Parents: 39870d4
Author: Lars Volker <lv...@cloudera.com>
Authored: Tue Jul 3 15:10:52 2018 -0700
Committer: Lars Volker <lv...@cloudera.com>
Committed: Thu Jul 12 21:35:42 2018 +0000

----------------------------------------------------------------------
 be/src/kudu/rpc/CMakeLists.txt                  |  138 +
 be/src/kudu/rpc/acceptor_pool.cc                |  175 ++
 be/src/kudu/rpc/acceptor_pool.h                 |   84 +
 be/src/kudu/rpc/blocking_ops.cc                 |  126 +
 be/src/kudu/rpc/blocking_ops.h                  |   58 +
 be/src/kudu/rpc/client_negotiation.cc           |  853 ++++++
 be/src/kudu/rpc/client_negotiation.h            |  263 ++
 be/src/kudu/rpc/connection.cc                   |  767 ++++++
 be/src/kudu/rpc/connection.h                    |  391 +++
 be/src/kudu/rpc/connection_id.cc                |   85 +
 be/src/kudu/rpc/connection_id.h                 |   84 +
 be/src/kudu/rpc/constants.cc                    |   37 +
 be/src/kudu/rpc/constants.h                     |   60 +
 be/src/kudu/rpc/exactly_once_rpc-test.cc        |  629 +++++
 be/src/kudu/rpc/inbound_call.cc                 |  345 +++
 be/src/kudu/rpc/inbound_call.h                  |  286 ++
 be/src/kudu/rpc/messenger.cc                    |  502 ++++
 be/src/kudu/rpc/messenger.h                     |  460 ++++
 be/src/kudu/rpc/mt-rpc-test.cc                  |  318 +++
 be/src/kudu/rpc/negotiation-test.cc             | 1346 ++++++++++
 be/src/kudu/rpc/negotiation.cc                  |  327 +++
 be/src/kudu/rpc/negotiation.h                   |   58 +
 be/src/kudu/rpc/outbound_call.cc                |  531 ++++
 be/src/kudu/rpc/outbound_call.h                 |  348 +++
 be/src/kudu/rpc/periodic-test.cc                |  295 +++
 be/src/kudu/rpc/periodic.cc                     |  219 ++
 be/src/kudu/rpc/periodic.h                      |  215 ++
 be/src/kudu/rpc/protoc-gen-krpc.cc              |  691 +++++
 be/src/kudu/rpc/proxy.cc                        |  116 +
 be/src/kudu/rpc/proxy.h                         |  126 +
 be/src/kudu/rpc/reactor-test.cc                 |  112 +
 be/src/kudu/rpc/reactor.cc                      |  918 +++++++
 be/src/kudu/rpc/reactor.h                       |  427 +++
 be/src/kudu/rpc/remote_method.cc                |   53 +
 be/src/kudu/rpc/remote_method.h                 |   51 +
 be/src/kudu/rpc/remote_user.cc                  |   40 +
 be/src/kudu/rpc/remote_user.h                   |   99 +
 be/src/kudu/rpc/request_tracker-test.cc         |   86 +
 be/src/kudu/rpc/request_tracker.cc              |   55 +
 be/src/kudu/rpc/request_tracker.h               |   87 +
 be/src/kudu/rpc/response_callback.h             |   31 +
 be/src/kudu/rpc/result_tracker.cc               |  595 +++++
 be/src/kudu/rpc/result_tracker.h                |  401 +++
 be/src/kudu/rpc/retriable_rpc.h                 |  296 +++
 be/src/kudu/rpc/rpc-bench.cc                    |  298 +++
 be/src/kudu/rpc/rpc-test-base.h                 |  661 +++++
 be/src/kudu/rpc/rpc-test.cc                     | 1364 ++++++++++
 be/src/kudu/rpc/rpc.cc                          |  101 +
 be/src/kudu/rpc/rpc.h                           |  221 ++
 be/src/kudu/rpc/rpc_context.cc                  |  217 ++
 be/src/kudu/rpc/rpc_context.h                   |  245 ++
 be/src/kudu/rpc/rpc_controller.cc               |  177 ++
 be/src/kudu/rpc/rpc_controller.h                |  282 ++
 be/src/kudu/rpc/rpc_header.proto                |  365 +++
 be/src/kudu/rpc/rpc_introspection.proto         |  110 +
 be/src/kudu/rpc/rpc_service.h                   |   47 +
 be/src/kudu/rpc/rpc_sidecar.cc                  |  115 +
 be/src/kudu/rpc/rpc_sidecar.h                   |   73 +
 be/src/kudu/rpc/rpc_stub-test.cc                |  726 ++++++
 be/src/kudu/rpc/rpcz_store.cc                   |  272 ++
 be/src/kudu/rpc/rpcz_store.h                    |   74 +
 be/src/kudu/rpc/rtest.proto                     |  160 ++
 be/src/kudu/rpc/rtest_diff_package.proto        |   26 +
 be/src/kudu/rpc/sasl_common.cc                  |  470 ++++
 be/src/kudu/rpc/sasl_common.h                   |  158 ++
 be/src/kudu/rpc/sasl_helper.cc                  |  134 +
 be/src/kudu/rpc/sasl_helper.h                   |  109 +
 be/src/kudu/rpc/serialization.cc                |  223 ++
 be/src/kudu/rpc/serialization.h                 |   88 +
 be/src/kudu/rpc/server_negotiation.cc           |  989 +++++++
 be/src/kudu/rpc/server_negotiation.h            |  259 ++
 be/src/kudu/rpc/service_if.cc                   |  160 ++
 be/src/kudu/rpc/service_if.h                    |  134 +
 be/src/kudu/rpc/service_pool.cc                 |  234 ++
 be/src/kudu/rpc/service_pool.h                  |  117 +
 be/src/kudu/rpc/service_queue-test.cc           |  151 ++
 be/src/kudu/rpc/service_queue.cc                |  145 ++
 be/src/kudu/rpc/service_queue.h                 |  225 ++
 be/src/kudu/rpc/transfer.cc                     |  283 ++
 be/src/kudu/rpc/transfer.h                      |  212 ++
 be/src/kudu/rpc/user_credentials.cc             |   64 +
 be/src/kudu/rpc/user_credentials.h              |   53 +
 be/src/kudu/security/CMakeLists.txt             |  141 +
 be/src/kudu/security/ca/cert_management-test.cc |  294 +++
 be/src/kudu/security/ca/cert_management.cc      |  423 +++
 be/src/kudu/security/ca/cert_management.h       |  226 ++
 be/src/kudu/security/cert-test.cc               |  165 ++
 be/src/kudu/security/cert.cc                    |  301 +++
 be/src/kudu/security/cert.h                     |  119 +
 be/src/kudu/security/crypto-test.cc             |  257 ++
 be/src/kudu/security/crypto.cc                  |  276 ++
 be/src/kudu/security/crypto.h                   |  103 +
 be/src/kudu/security/init.cc                    |  465 ++++
 be/src/kudu/security/init.h                     |   84 +
 be/src/kudu/security/kerberos_util.cc           |   37 +
 be/src/kudu/security/kerberos_util.h            |   29 +
 be/src/kudu/security/krb5_realm_override.cc     |  105 +
 be/src/kudu/security/openssl_util.cc            |  322 +++
 be/src/kudu/security/openssl_util.h             |  217 ++
 be/src/kudu/security/openssl_util_bio.h         |  129 +
 be/src/kudu/security/security-test-util.cc      |  103 +
 be/src/kudu/security/security-test-util.h       |   56 +
 be/src/kudu/security/security_flags.cc          |   42 +
 be/src/kudu/security/security_flags.h           |   36 +
 be/src/kudu/security/simple_acl.cc              |   89 +
 be/src/kudu/security/simple_acl.h               |   58 +
 be/src/kudu/security/test/mini_kdc-test.cc      |  144 ++
 be/src/kudu/security/test/mini_kdc.cc           |  315 +++
 be/src/kudu/security/test/mini_kdc.h            |  134 +
 be/src/kudu/security/test/test_certs.cc         |  969 +++++++
 be/src/kudu/security/test/test_certs.h          |   86 +
 be/src/kudu/security/test/test_pass.cc          |   40 +
 be/src/kudu/security/test/test_pass.h           |   37 +
 be/src/kudu/security/tls_context.cc             |  520 ++++
 be/src/kudu/security/tls_context.h              |  202 ++
 be/src/kudu/security/tls_handshake-test.cc      |  390 +++
 be/src/kudu/security/tls_handshake.cc           |  274 ++
 be/src/kudu/security/tls_handshake.h            |  171 ++
 be/src/kudu/security/tls_socket-test.cc         |  366 +++
 be/src/kudu/security/tls_socket.cc              |  185 ++
 be/src/kudu/security/tls_socket.h               |   60 +
 be/src/kudu/security/token-test.cc              |  677 +++++
 be/src/kudu/security/token.proto                |   97 +
 be/src/kudu/security/token_signer.cc            |  299 +++
 be/src/kudu/security/token_signer.h             |  316 +++
 be/src/kudu/security/token_signing_key.cc       |  110 +
 be/src/kudu/security/token_signing_key.h        |  103 +
 be/src/kudu/security/token_verifier.cc          |  173 ++
 be/src/kudu/security/token_verifier.h           |  126 +
 be/src/kudu/security/x509_check_host.cc         |  439 ++++
 be/src/kudu/security/x509_check_host.h          |   50 +
 be/src/kudu/util/CMakeLists.txt                 |  482 ++++
 be/src/kudu/util/alignment.h                    |   28 +
 be/src/kudu/util/array_view.h                   |  133 +
 be/src/kudu/util/async_logger.cc                |  151 ++
 be/src/kudu/util/async_logger.h                 |  206 ++
 be/src/kudu/util/async_util-test.cc             |  129 +
 be/src/kudu/util/async_util.h                   |   99 +
 be/src/kudu/util/atomic-test.cc                 |  135 +
 be/src/kudu/util/atomic.cc                      |   56 +
 be/src/kudu/util/atomic.h                       |  322 +++
 be/src/kudu/util/auto_release_pool.h            |   99 +
 be/src/kudu/util/barrier.h                      |   68 +
 be/src/kudu/util/bit-stream-utils.h             |  150 ++
 be/src/kudu/util/bit-stream-utils.inline.h      |  211 ++
 be/src/kudu/util/bit-util-test.cc               |   45 +
 be/src/kudu/util/bit-util.h                     |   57 +
 be/src/kudu/util/bitmap-test.cc                 |  230 ++
 be/src/kudu/util/bitmap.cc                      |  136 +
 be/src/kudu/util/bitmap.h                       |  219 ++
 be/src/kudu/util/blocking_queue-test.cc         |  249 ++
 be/src/kudu/util/blocking_queue.h               |  256 ++
 be/src/kudu/util/bloom_filter-test.cc           |   92 +
 be/src/kudu/util/bloom_filter.cc                |   89 +
 be/src/kudu/util/bloom_filter.h                 |  254 ++
 be/src/kudu/util/boost_mutex_utils.h            |   45 +
 be/src/kudu/util/cache-bench.cc                 |  191 ++
 be/src/kudu/util/cache-test.cc                  |  246 ++
 be/src/kudu/util/cache.cc                       |  572 ++++
 be/src/kudu/util/cache.h                        |  216 ++
 be/src/kudu/util/cache_metrics.cc               |   69 +
 be/src/kudu/util/cache_metrics.h                |   42 +
 be/src/kudu/util/callback_bind-test.cc          |  119 +
 be/src/kudu/util/coding-inl.h                   |  120 +
 be/src/kudu/util/coding.cc                      |  142 +
 be/src/kudu/util/coding.h                       |  113 +
 .../kudu/util/compression/compression-test.cc   |   90 +
 be/src/kudu/util/compression/compression.proto  |   29 +
 .../kudu/util/compression/compression_codec.cc  |  286 ++
 .../kudu/util/compression/compression_codec.h   |   78 +
 be/src/kudu/util/condition_variable.cc          |  142 +
 be/src/kudu/util/condition_variable.h           |  118 +
 be/src/kudu/util/countdown_latch-test.cc        |   74 +
 be/src/kudu/util/countdown_latch.h              |  137 +
 be/src/kudu/util/cow_object.cc                  |   34 +
 be/src/kudu/util/cow_object.h                   |  437 ++++
 be/src/kudu/util/crc-test.cc                    |  112 +
 be/src/kudu/util/crc.cc                         |   56 +
 be/src/kudu/util/crc.h                          |   43 +
 be/src/kudu/util/curl_util.cc                   |  130 +
 be/src/kudu/util/curl_util.h                    |   92 +
 be/src/kudu/util/debug-util-test.cc             |  458 ++++
 be/src/kudu/util/debug-util.cc                  |  800 ++++++
 be/src/kudu/util/debug-util.h                   |  321 +++
 be/src/kudu/util/debug/leak_annotations.h       |   84 +
 be/src/kudu/util/debug/leakcheck_disabler.h     |   48 +
 be/src/kudu/util/debug/sanitizer_scopes.h       |   47 +
 be/src/kudu/util/debug/trace_event.h            | 1501 +++++++++++
 be/src/kudu/util/debug/trace_event_impl.cc      | 2436 ++++++++++++++++++
 be/src/kudu/util/debug/trace_event_impl.h       |  726 ++++++
 .../util/debug/trace_event_impl_constants.cc    |   14 +
 be/src/kudu/util/debug/trace_event_memory.h     |   28 +
 .../util/debug/trace_event_synthetic_delay.cc   |  238 ++
 .../util/debug/trace_event_synthetic_delay.h    |  166 ++
 be/src/kudu/util/debug/trace_logging.h          |  132 +
 be/src/kudu/util/debug/unwind_safeness.cc       |  164 ++
 be/src/kudu/util/debug/unwind_safeness.h        |   29 +
 be/src/kudu/util/debug_ref_counted.h            |   56 +
 be/src/kudu/util/decimal_util-test.cc           |   81 +
 be/src/kudu/util/decimal_util.cc                |   89 +
 be/src/kudu/util/decimal_util.h                 |   69 +
 be/src/kudu/util/easy_json-test.cc              |  106 +
 be/src/kudu/util/easy_json.cc                   |  212 ++
 be/src/kudu/util/easy_json.h                    |  190 ++
 be/src/kudu/util/env-test.cc                    | 1173 +++++++++
 be/src/kudu/util/env.cc                         |   93 +
 be/src/kudu/util/env.h                          |  681 +++++
 be/src/kudu/util/env_posix.cc                   | 1852 +++++++++++++
 be/src/kudu/util/env_util-test.cc               |  192 ++
 be/src/kudu/util/env_util.cc                    |  320 +++
 be/src/kudu/util/env_util.h                     |  112 +
 be/src/kudu/util/errno-test.cc                  |   50 +
 be/src/kudu/util/errno.cc                       |   52 +
 be/src/kudu/util/errno.h                        |   36 +
 be/src/kudu/util/faststring-test.cc             |   65 +
 be/src/kudu/util/faststring.cc                  |   72 +
 be/src/kudu/util/faststring.h                   |  259 ++
 be/src/kudu/util/fault_injection.cc             |   78 +
 be/src/kudu/util/fault_injection.h              |   98 +
 be/src/kudu/util/file_cache-stress-test.cc      |  402 +++
 be/src/kudu/util/file_cache-test-util.h         |   92 +
 be/src/kudu/util/file_cache-test.cc             |  361 +++
 be/src/kudu/util/file_cache.cc                  |  654 +++++
 be/src/kudu/util/file_cache.h                   |  209 ++
 be/src/kudu/util/flag_tags-test.cc              |  135 +
 be/src/kudu/util/flag_tags.cc                   |   91 +
 be/src/kudu/util/flag_tags.h                    |  169 ++
 be/src/kudu/util/flag_validators-test.cc        |  252 ++
 be/src/kudu/util/flag_validators.cc             |   67 +
 be/src/kudu/util/flag_validators.h              |  102 +
 be/src/kudu/util/flags-test.cc                  |  109 +
 be/src/kudu/util/flags.cc                       |  604 +++++
 be/src/kudu/util/flags.h                        |   89 +
 be/src/kudu/util/group_varint-inl.h             |  294 +++
 be/src/kudu/util/group_varint-test.cc           |  144 ++
 be/src/kudu/util/group_varint.cc                |   81 +
 be/src/kudu/util/hash_util-test.cc              |   42 +
 be/src/kudu/util/hash_util.h                    |   71 +
 be/src/kudu/util/hdr_histogram-test.cc          |  116 +
 be/src/kudu/util/hdr_histogram.cc               |  501 ++++
 be/src/kudu/util/hdr_histogram.h                |  351 +++
 be/src/kudu/util/hexdump.cc                     |   85 +
 be/src/kudu/util/hexdump.h                      |   34 +
 be/src/kudu/util/high_water_mark.h              |   85 +
 be/src/kudu/util/histogram.proto                |   48 +
 be/src/kudu/util/init.cc                        |   89 +
 be/src/kudu/util/init.h                         |   33 +
 be/src/kudu/util/inline_slice-test.cc           |   88 +
 be/src/kudu/util/inline_slice.h                 |  181 ++
 be/src/kudu/util/int128-test.cc                 |   69 +
 be/src/kudu/util/int128.h                       |   46 +
 be/src/kudu/util/int128_util.h                  |   39 +
 be/src/kudu/util/interval_tree-inl.h            |  444 ++++
 be/src/kudu/util/interval_tree-test.cc          |  353 +++
 be/src/kudu/util/interval_tree.h                |  158 ++
 be/src/kudu/util/jsonreader-test.cc             |  193 ++
 be/src/kudu/util/jsonreader.cc                  |  141 +
 be/src/kudu/util/jsonreader.h                   |   92 +
 be/src/kudu/util/jsonwriter-test.cc             |  216 ++
 be/src/kudu/util/jsonwriter.cc                  |  352 +++
 be/src/kudu/util/jsonwriter.h                   |  102 +
 be/src/kudu/util/jsonwriter_test.proto          |   79 +
 be/src/kudu/util/kernel_stack_watchdog.cc       |  256 ++
 be/src/kudu/util/kernel_stack_watchdog.h        |  290 +++
 be/src/kudu/util/knapsack_solver-test.cc        |  172 ++
 be/src/kudu/util/knapsack_solver.h              |  269 ++
 be/src/kudu/util/locks.cc                       |   47 +
 be/src/kudu/util/locks.h                        |  294 +++
 be/src/kudu/util/logging-test.cc                |  249 ++
 be/src/kudu/util/logging.cc                     |  413 +++
 be/src/kudu/util/logging.h                      |  367 +++
 be/src/kudu/util/logging_callback.h             |   46 +
 be/src/kudu/util/logging_test_util.h            |   60 +
 be/src/kudu/util/maintenance_manager-test.cc    |  369 +++
 be/src/kudu/util/maintenance_manager.cc         |  550 ++++
 be/src/kudu/util/maintenance_manager.h          |  361 +++
 be/src/kudu/util/maintenance_manager.proto      |   54 +
 be/src/kudu/util/make_shared.h                  |   64 +
 be/src/kudu/util/malloc.cc                      |   35 +
 be/src/kudu/util/malloc.h                       |   32 +
 be/src/kudu/util/map-util-test.cc               |  116 +
 be/src/kudu/util/mem_tracker-test.cc            |  285 ++
 be/src/kudu/util/mem_tracker.cc                 |  296 +++
 be/src/kudu/util/mem_tracker.h                  |  272 ++
 be/src/kudu/util/memcmpable_varint-test.cc      |  220 ++
 be/src/kudu/util/memcmpable_varint.cc           |  257 ++
 be/src/kudu/util/memcmpable_varint.h            |   45 +
 be/src/kudu/util/memory/arena-test.cc           |  205 ++
 be/src/kudu/util/memory/arena.cc                |  167 ++
 be/src/kudu/util/memory/arena.h                 |  501 ++++
 be/src/kudu/util/memory/memory.cc               |  339 +++
 be/src/kudu/util/memory/memory.h                |  970 +++++++
 be/src/kudu/util/memory/overwrite.cc            |   42 +
 be/src/kudu/util/memory/overwrite.h             |   33 +
 be/src/kudu/util/metrics-test.cc                |  388 +++
 be/src/kudu/util/metrics.cc                     |  746 ++++++
 be/src/kudu/util/metrics.h                      | 1195 +++++++++
 be/src/kudu/util/minidump-test.cc               |  149 ++
 be/src/kudu/util/minidump.cc                    |  382 +++
 be/src/kudu/util/minidump.h                     |  104 +
 be/src/kudu/util/monotime-test.cc               |  424 +++
 be/src/kudu/util/monotime.cc                    |  334 +++
 be/src/kudu/util/monotime.h                     |  421 +++
 be/src/kudu/util/mt-hdr_histogram-test.cc       |  116 +
 be/src/kudu/util/mt-metrics-test.cc             |  128 +
 be/src/kudu/util/mt-threadlocal-test.cc         |  357 +++
 be/src/kudu/util/mutex.cc                       |  164 ++
 be/src/kudu/util/mutex.h                        |  142 +
 be/src/kudu/util/net/dns_resolver-test.cc       |   59 +
 be/src/kudu/util/net/dns_resolver.cc            |   65 +
 be/src/kudu/util/net/dns_resolver.h             |   62 +
 be/src/kudu/util/net/net_util-test.cc           |  170 ++
 be/src/kudu/util/net/net_util.cc                |  402 +++
 be/src/kudu/util/net/net_util.h                 |  166 ++
 be/src/kudu/util/net/sockaddr.cc                |  136 +
 be/src/kudu/util/net/sockaddr.h                 |   94 +
 be/src/kudu/util/net/socket-test.cc             |   89 +
 be/src/kudu/util/net/socket.cc                  |  590 +++++
 be/src/kudu/util/net/socket.h                   |  178 ++
 be/src/kudu/util/nvm_cache.cc                   |  577 +++++
 be/src/kudu/util/nvm_cache.h                    |   31 +
 be/src/kudu/util/object_pool-test.cc            |   86 +
 be/src/kudu/util/object_pool.h                  |  166 ++
 be/src/kudu/util/oid_generator-test.cc          |   52 +
 be/src/kudu/util/oid_generator.cc               |   65 +
 be/src/kudu/util/oid_generator.h                |   63 +
 be/src/kudu/util/once-test.cc                   |  113 +
 be/src/kudu/util/once.cc                        |   32 +
 be/src/kudu/util/once.h                         |  116 +
 be/src/kudu/util/os-util-test.cc                |   62 +
 be/src/kudu/util/os-util.cc                     |  185 ++
 be/src/kudu/util/os-util.h                      |   72 +
 be/src/kudu/util/path_util-test.cc              |   77 +
 be/src/kudu/util/path_util.cc                   |  122 +
 be/src/kudu/util/path_util.h                    |   63 +
 be/src/kudu/util/pb_util-internal.cc            |  105 +
 be/src/kudu/util/pb_util-internal.h             |  136 +
 be/src/kudu/util/pb_util-test.cc                |  661 +++++
 be/src/kudu/util/pb_util.cc                     | 1088 ++++++++
 be/src/kudu/util/pb_util.h                      |  513 ++++
 be/src/kudu/util/pb_util.proto                  |   45 +
 be/src/kudu/util/pb_util_test.proto             |   29 +
 be/src/kudu/util/process_memory-test.cc         |   75 +
 be/src/kudu/util/process_memory.cc              |  287 +++
 be/src/kudu/util/process_memory.h               |   62 +
 be/src/kudu/util/promise.h                      |   79 +
 be/src/kudu/util/proto_container_test.proto     |   25 +
 be/src/kudu/util/proto_container_test2.proto    |   29 +
 be/src/kudu/util/proto_container_test3.proto    |   33 +
 be/src/kudu/util/protobuf-annotations.h         |   33 +
 be/src/kudu/util/protobuf_util.h                |   39 +
 be/src/kudu/util/protoc-gen-insertions.cc       |   77 +
 be/src/kudu/util/pstack_watcher-test.cc         |  100 +
 be/src/kudu/util/pstack_watcher.cc              |  249 ++
 be/src/kudu/util/pstack_watcher.h               |  101 +
 be/src/kudu/util/random-test.cc                 |  171 ++
 be/src/kudu/util/random.h                       |  252 ++
 be/src/kudu/util/random_util-test.cc            |   75 +
 be/src/kudu/util/random_util.cc                 |   65 +
 be/src/kudu/util/random_util.h                  |   44 +
 be/src/kudu/util/rle-encoding.h                 |  523 ++++
 be/src/kudu/util/rle-test.cc                    |  546 ++++
 be/src/kudu/util/rolling_log-test.cc            |  147 ++
 be/src/kudu/util/rolling_log.cc                 |  285 ++
 be/src/kudu/util/rolling_log.h                  |  128 +
 be/src/kudu/util/rw_mutex-test.cc               |  185 ++
 be/src/kudu/util/rw_mutex.cc                    |  207 ++
 be/src/kudu/util/rw_mutex.h                     |  123 +
 be/src/kudu/util/rw_semaphore-test.cc           |   94 +
 be/src/kudu/util/rw_semaphore.h                 |  206 ++
 be/src/kudu/util/rwc_lock-test.cc               |  147 ++
 be/src/kudu/util/rwc_lock.cc                    |  136 +
 be/src/kudu/util/rwc_lock.h                     |  142 +
 be/src/kudu/util/safe_math-test.cc              |   56 +
 be/src/kudu/util/safe_math.h                    |   69 +
 be/src/kudu/util/scoped_cleanup-test.cc         |   56 +
 be/src/kudu/util/scoped_cleanup.h               |   67 +
 be/src/kudu/util/semaphore.cc                   |  105 +
 be/src/kudu/util/semaphore.h                    |   77 +
 be/src/kudu/util/semaphore_macosx.cc            |   75 +
 be/src/kudu/util/signal.cc                      |   47 +
 be/src/kudu/util/signal.h                       |   42 +
 be/src/kudu/util/slice-test.cc                  |   61 +
 be/src/kudu/util/slice.cc                       |   97 +
 be/src/kudu/util/slice.h                        |  332 +++
 .../util/sorted_disjoint_interval_list-test.cc  |   98 +
 .../kudu/util/sorted_disjoint_interval_list.h   |   95 +
 be/src/kudu/util/spinlock_profiling-test.cc     |   81 +
 be/src/kudu/util/spinlock_profiling.cc          |  308 +++
 be/src/kudu/util/spinlock_profiling.h           |   72 +
 be/src/kudu/util/stack_watchdog-test.cc         |  152 ++
 be/src/kudu/util/status-test.cc                 |  119 +
 be/src/kudu/util/status.cc                      |  170 ++
 be/src/kudu/util/status.h                       |  493 ++++
 be/src/kudu/util/status_callback.cc             |   41 +
 be/src/kudu/util/status_callback.h              |   54 +
 be/src/kudu/util/stopwatch.h                    |  364 +++
 be/src/kudu/util/string_case-test.cc            |   65 +
 be/src/kudu/util/string_case.cc                 |   76 +
 be/src/kudu/util/string_case.h                  |   48 +
 be/src/kudu/util/striped64-test.cc              |  163 ++
 be/src/kudu/util/striped64.cc                   |  191 ++
 be/src/kudu/util/striped64.h                    |  168 ++
 be/src/kudu/util/subprocess-test.cc             |  381 +++
 be/src/kudu/util/subprocess.cc                  |  815 ++++++
 be/src/kudu/util/subprocess.h                   |  219 ++
 be/src/kudu/util/test_graph.cc                  |  121 +
 be/src/kudu/util/test_graph.h                   |   90 +
 be/src/kudu/util/test_macros.h                  |  123 +
 be/src/kudu/util/test_main.cc                   |  109 +
 be/src/kudu/util/test_util.cc                   |  446 ++++
 be/src/kudu/util/test_util.h                    |  146 ++
 be/src/kudu/util/test_util_prod.cc              |   28 +
 be/src/kudu/util/test_util_prod.h               |   32 +
 be/src/kudu/util/thread-test.cc                 |  160 ++
 be/src/kudu/util/thread.cc                      |  628 +++++
 be/src/kudu/util/thread.h                       |  373 +++
 be/src/kudu/util/thread_restrictions.cc         |   87 +
 be/src/kudu/util/thread_restrictions.h          |  121 +
 be/src/kudu/util/threadlocal.cc                 |   89 +
 be/src/kudu/util/threadlocal.h                  |  128 +
 be/src/kudu/util/threadlocal_cache.h            |  110 +
 be/src/kudu/util/threadpool-test.cc             |  941 +++++++
 be/src/kudu/util/threadpool.cc                  |  766 ++++++
 be/src/kudu/util/threadpool.h                   |  505 ++++
 be/src/kudu/util/throttler-test.cc              |   76 +
 be/src/kudu/util/throttler.cc                   |   67 +
 be/src/kudu/util/throttler.h                    |   62 +
 be/src/kudu/util/trace-test.cc                  |  891 +++++++
 be/src/kudu/util/trace.cc                       |  259 ++
 be/src/kudu/util/trace.h                        |  292 +++
 be/src/kudu/util/trace_metrics.cc               |   74 +
 be/src/kudu/util/trace_metrics.h                |   89 +
 be/src/kudu/util/url-coding-test.cc             |  112 +
 be/src/kudu/util/url-coding.cc                  |  208 ++
 be/src/kudu/util/url-coding.h                   |   69 +
 be/src/kudu/util/user-test.cc                   |   44 +
 be/src/kudu/util/user.cc                        |   90 +
 be/src/kudu/util/user.h                         |   32 +
 be/src/kudu/util/version_info.cc                |   84 +
 be/src/kudu/util/version_info.h                 |   51 +
 be/src/kudu/util/version_info.proto             |   32 +
 be/src/kudu/util/version_util-test.cc           |   66 +
 be/src/kudu/util/version_util.cc                |   83 +
 be/src/kudu/util/version_util.h                 |   58 +
 be/src/kudu/util/web_callback_registry.h        |  129 +
 be/src/kudu/util/website_util.cc                |   43 +
 be/src/kudu/util/website_util.h                 |   35 +
 be/src/kudu/util/zlib.cc                        |  127 +
 be/src/kudu/util/zlib.h                         |   39 +
 450 files changed, 99139 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/CMakeLists.txt b/be/src/kudu/rpc/CMakeLists.txt
new file mode 100644
index 0000000..f8cdb02
--- /dev/null
+++ b/be/src/kudu/rpc/CMakeLists.txt
@@ -0,0 +1,138 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#### Global header protobufs
+PROTOBUF_GENERATE_CPP(
+  RPC_HEADER_PROTO_SRCS RPC_HEADER_PROTO_HDRS RPC_HEADER_PROTO_TGTS
+  SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+  BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+  PROTO_FILES rpc_header.proto)
+ADD_EXPORTABLE_LIBRARY(rpc_header_proto
+  SRCS ${RPC_HEADER_PROTO_SRCS}
+  DEPS protobuf pb_util_proto token_proto
+  NONLINK_DEPS ${RPC_HEADER_PROTO_TGTS})
+
+PROTOBUF_GENERATE_CPP(
+  RPC_INTROSPECTION_PROTO_SRCS RPC_INTROSPECTION_PROTO_HDRS RPC_INTROSPECTION_PROTO_TGTS
+  SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+  BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+  PROTO_FILES rpc_introspection.proto)
+set(RPC_INTROSPECTION_PROTO_LIBS
+  rpc_header_proto
+  protobuf)
+ADD_EXPORTABLE_LIBRARY(rpc_introspection_proto
+  SRCS ${RPC_INTROSPECTION_PROTO_SRCS}
+  DEPS ${RPC_INTROSPECTION_PROTO_LIBS}
+  NONLINK_DEPS ${RPC_INTROSPECTION_PROTO_TGTS})
+
+### RPC library
+set(KRPC_SRCS
+    acceptor_pool.cc
+    blocking_ops.cc
+    client_negotiation.cc
+    connection.cc
+    connection_id.cc
+    constants.cc
+    inbound_call.cc
+    messenger.cc
+    negotiation.cc
+    outbound_call.cc
+    periodic.cc
+    proxy.cc
+    reactor.cc
+    remote_method.cc
+    remote_user.cc
+    request_tracker.cc
+    result_tracker.cc
+    rpc.cc
+    rpc_context.cc
+    rpc_controller.cc
+    rpc_sidecar.cc
+    rpcz_store.cc
+    sasl_common.cc
+    sasl_helper.cc
+    serialization.cc
+    server_negotiation.cc
+    service_if.cc
+    service_pool.cc
+    service_queue.cc
+    user_credentials.cc
+    transfer.cc
+)
+
+set(KRPC_LIBS
+  cyrus_sasl
+  gssapi_krb5
+  gutil
+  kudu_util
+  libev
+  rpc_header_proto
+  rpc_introspection_proto
+  security)
+
+ADD_EXPORTABLE_LIBRARY(krpc
+  SRCS ${KRPC_SRCS}
+  DEPS ${KRPC_LIBS})
+
+### RPC generator tool
+add_executable(protoc-gen-krpc protoc-gen-krpc.cc)
+target_link_libraries(protoc-gen-krpc
+    ${KUDU_BASE_LIBS}
+    rpc_header_proto
+    protoc
+    protobuf
+    gutil
+    kudu_util)
+
+#### RPC test
+PROTOBUF_GENERATE_CPP(
+  RPC_TEST_DIFF_PACKAGE_SRCS RPC_TEST_DIFF_PACKAGE_HDRS RPC_TEST_DIFF_PACKAGE_TGTS
+  SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+  BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+  PROTO_FILES rtest_diff_package.proto)
+add_library(rtest_diff_package_proto ${RPC_TEST_DIFF_PACKAGE_SRCS} ${RPC_TEST_DIFF_PACKAGE_HDRS})
+target_link_libraries(rtest_diff_package_proto rpc_header_proto)
+
+KRPC_GENERATE(
+  RTEST_KRPC_SRCS RTEST_KRPC_HDRS RTEST_KRPC_TGTS
+  SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+  BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+  PROTO_FILES rtest.proto)
+add_library(rtest_krpc ${RTEST_KRPC_SRCS} ${RTEST_KRPC_HDRS})
+target_link_libraries(rtest_krpc
+  krpc
+  rpc_header_proto
+  rtest_diff_package_proto)
+
+# Tests
+set(KUDU_TEST_LINK_LIBS
+  krpc
+  mini_kdc
+  rpc_header_proto
+  rtest_krpc
+  security_test_util
+  ${KUDU_MIN_TEST_LIBS})
+ADD_KUDU_TEST(exactly_once_rpc-test PROCESSORS 10)
+ADD_KUDU_TEST(mt-rpc-test RUN_SERIAL true)
+ADD_KUDU_TEST(negotiation-test)
+ADD_KUDU_TEST(periodic-test)
+ADD_KUDU_TEST(reactor-test)
+ADD_KUDU_TEST(request_tracker-test)
+ADD_KUDU_TEST(rpc-bench RUN_SERIAL true)
+ADD_KUDU_TEST(rpc-test)
+ADD_KUDU_TEST(rpc_stub-test)
+ADD_KUDU_TEST(service_queue-test RUN_SERIAL true)

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/acceptor_pool.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/acceptor_pool.cc b/be/src/kudu/rpc/acceptor_pool.cc
new file mode 100644
index 0000000..e4bcbd1
--- /dev/null
+++ b/be/src/kudu/rpc/acceptor_pool.cc
@@ -0,0 +1,175 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/acceptor_pool.h"
+
+#include <string>
+#include <ostream>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+
+namespace google {
+namespace protobuf {
+
+class Message;
+
+}
+}
+
+using google::protobuf::Message;
+using std::string;
+
+METRIC_DEFINE_counter(server, rpc_connections_accepted,
+                      "RPC Connections Accepted",
+                      kudu::MetricUnit::kConnections,
+                      "Number of incoming TCP connections made to the RPC server");
+
+DEFINE_int32(rpc_acceptor_listen_backlog, 128,
+             "Socket backlog parameter used when listening for RPC connections. "
+             "This defines the maximum length to which the queue of pending "
+             "TCP connections inbound to the RPC server may grow. If a connection "
+             "request arrives when the queue is full, the client may receive "
+             "an error. Higher values may help the server ride over bursts of "
+             "new inbound connection requests.");
+TAG_FLAG(rpc_acceptor_listen_backlog, advanced);
+
+namespace kudu {
+namespace rpc {
+
+AcceptorPool::AcceptorPool(Messenger* messenger, Socket* socket,
+                           Sockaddr bind_address)
+    : messenger_(messenger),
+      socket_(socket->Release()),
+      bind_address_(bind_address),
+      rpc_connections_accepted_(METRIC_rpc_connections_accepted.Instantiate(
+          messenger->metric_entity())),
+      closing_(false) {}
+
+AcceptorPool::~AcceptorPool() {
+  Shutdown();
+}
+
+Status AcceptorPool::Start(int num_threads) {
+  RETURN_NOT_OK(socket_.Listen(FLAGS_rpc_acceptor_listen_backlog));
+
+  for (int i = 0; i < num_threads; i++) {
+    scoped_refptr<kudu::Thread> new_thread;
+    Status s = kudu::Thread::Create("acceptor pool", "acceptor",
+        &AcceptorPool::RunThread, this, &new_thread);
+    if (!s.ok()) {
+      Shutdown();
+      return s;
+    }
+    threads_.push_back(new_thread);
+  }
+  return Status::OK();
+}
+
+void AcceptorPool::Shutdown() {
+  if (Acquire_CompareAndSwap(&closing_, false, true) != false) {
+    VLOG(2) << "Acceptor Pool on " << bind_address_.ToString()
+            << " already shut down";
+    return;
+  }
+
+#if defined(__linux__)
+  // Closing the socket will break us out of accept() if we're in it, and
+  // prevent future accepts.
+  WARN_NOT_OK(socket_.Shutdown(true, true),
+              strings::Substitute("Could not shut down acceptor socket on $0",
+                                  bind_address_.ToString()));
+#else
+  // Calling shutdown on an accepting (non-connected) socket is illegal on most
+  // platforms (but not Linux). Instead, the accepting threads are interrupted
+  // forcefully.
+  for (const scoped_refptr<kudu::Thread>& thread : threads_) {
+    pthread_cancel(thread.get()->pthread_id());
+  }
+#endif
+
+  for (const scoped_refptr<kudu::Thread>& thread : threads_) {
+    CHECK_OK(ThreadJoiner(thread.get()).Join());
+  }
+  threads_.clear();
+
+  // Close the socket: keeping the descriptor open and, possibly, receiving late
+  // not-to-be-read messages from the peer does not make much sense. The
+  // Socket::Close() method is called upon destruction of the aggregated socket_
+  // object as well. However, the typical ownership pattern of an AcceptorPool
+  // object includes two references wrapped via a shared_ptr smart pointer: one
+  // is held by Messenger, another by RpcServer. If not calling Socket::Close()
+  // here, it would  necessary to wait until Messenger::Shutdown() is called for
+  // the corresponding messenger object to close this socket.
+  ignore_result(socket_.Close());
+}
+
+Sockaddr AcceptorPool::bind_address() const {
+  return bind_address_;
+}
+
+Status AcceptorPool::GetBoundAddress(Sockaddr* addr) const {
+  return socket_.GetSocketAddress(addr);
+}
+
+int64_t AcceptorPool::num_rpc_connections_accepted() const {
+  return rpc_connections_accepted_->value();
+}
+
+void AcceptorPool::RunThread() {
+  while (true) {
+    Socket new_sock;
+    Sockaddr remote;
+    VLOG(2) << "calling accept() on socket " << socket_.GetFd()
+            << " listening on " << bind_address_.ToString();
+    Status s = socket_.Accept(&new_sock, &remote, Socket::FLAG_NONBLOCKING);
+    if (!s.ok()) {
+      if (Release_Load(&closing_)) {
+        break;
+      }
+      KLOG_EVERY_N_SECS(WARNING, 1) << "AcceptorPool: accept failed: " << s.ToString()
+                                    << THROTTLE_MSG;
+      continue;
+    }
+    s = new_sock.SetNoDelay(true);
+    if (!s.ok()) {
+      KLOG_EVERY_N_SECS(WARNING, 1) << "Acceptor with remote = " << remote.ToString()
+          << " failed to set TCP_NODELAY on a newly accepted socket: "
+          << s.ToString() << THROTTLE_MSG;
+      continue;
+    }
+    rpc_connections_accepted_->Increment();
+    messenger_->RegisterInboundSocket(&new_sock, remote);
+  }
+  VLOG(1) << "AcceptorPool shutting down.";
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/acceptor_pool.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/acceptor_pool.h b/be/src/kudu/rpc/acceptor_pool.h
new file mode 100644
index 0000000..ba1996a
--- /dev/null
+++ b/be/src/kudu/rpc/acceptor_pool.h
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_RPC_ACCEPTOR_POOL_H
+#define KUDU_RPC_ACCEPTOR_POOL_H
+
+#include <stdint.h>
+#include <vector>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Counter;
+class Thread;
+
+namespace rpc {
+
+class Messenger;
+
+// A pool of threads calling accept() to create new connections.
+// Acceptor pool threads terminate when they notice that the messenger has been
+// shut down, if Shutdown() is called, or if the pool object is destructed.
+class AcceptorPool {
+ public:
+  // Create a new acceptor pool.  Calls socket::Release to take ownership of the
+  // socket.
+  // 'socket' must be already bound, but should not yet be listening.
+  AcceptorPool(Messenger *messenger, Socket *socket, Sockaddr bind_address);
+  ~AcceptorPool();
+
+  // Start listening and accepting connections.
+  Status Start(int num_threads);
+  void Shutdown();
+
+  // Return the address that the pool is bound to. If the port is specified as
+  // 0, then this will always return port 0.
+  Sockaddr bind_address() const;
+
+  // Return the address that the pool is bound to. This only works while the
+  // socket is open, and if the specified port is 0 then this will return the
+  // actual port that was bound.
+  Status GetBoundAddress(Sockaddr* addr) const;
+
+  // Return the number of connections accepted by this messenger. Thread-safe.
+  int64_t num_rpc_connections_accepted() const;
+
+ private:
+  void RunThread();
+
+  Messenger *messenger_;
+  Socket socket_;
+  Sockaddr bind_address_;
+  std::vector<scoped_refptr<kudu::Thread> > threads_;
+
+  scoped_refptr<Counter> rpc_connections_accepted_;
+
+  Atomic32 closing_;
+
+  DISALLOW_COPY_AND_ASSIGN(AcceptorPool);
+};
+
+} // namespace rpc
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/blocking_ops.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/blocking_ops.cc b/be/src/kudu/rpc/blocking_ops.cc
new file mode 100644
index 0000000..f5cd644
--- /dev/null
+++ b/be/src/kudu/rpc/blocking_ops.cc
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/blocking_ops.h"
+
+#include <cstdint>
+#include <cstring>
+#include <ostream>
+
+#include <glog/logging.h>
+#include <google/protobuf/message_lite.h>
+
+#include "kudu/gutil/endian.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/constants.h"
+#include "kudu/rpc/serialization.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace rpc {
+
+using google::protobuf::MessageLite;
+
+const char kHTTPHeader[] = "HTTP";
+
+Status CheckInBlockingMode(const Socket* sock) {
+  bool is_nonblocking;
+  RETURN_NOT_OK(sock->IsNonBlocking(&is_nonblocking));
+  if (is_nonblocking) {
+    static const char* const kErrMsg = "socket is not in blocking mode";
+    LOG(DFATAL) << kErrMsg;
+    return Status::IllegalState(kErrMsg);
+  }
+  return Status::OK();
+}
+
+Status SendFramedMessageBlocking(Socket* sock, const MessageLite& header, const MessageLite& msg,
+    const MonoTime& deadline) {
+  DCHECK(sock != nullptr);
+  DCHECK(header.IsInitialized()) << "header protobuf must be initialized";
+  DCHECK(msg.IsInitialized()) << "msg protobuf must be initialized";
+
+  // Ensure we are in blocking mode.
+  // These blocking calls are typically not in the fast path, so doing this for all build types.
+  RETURN_NOT_OK(CheckInBlockingMode(sock));
+
+  // Serialize message
+  faststring param_buf;
+  serialization::SerializeMessage(msg, &param_buf);
+
+  // Serialize header and initial length
+  faststring header_buf;
+  serialization::SerializeHeader(header, param_buf.size(), &header_buf);
+
+  // Write header & param to stream
+  size_t nsent;
+  RETURN_NOT_OK(sock->BlockingWrite(header_buf.data(), header_buf.size(), &nsent, deadline));
+  RETURN_NOT_OK(sock->BlockingWrite(param_buf.data(), param_buf.size(), &nsent, deadline));
+
+  return Status::OK();
+}
+
+Status ReceiveFramedMessageBlocking(Socket* sock, faststring* recv_buf,
+    MessageLite* header, Slice* param_buf, const MonoTime& deadline) {
+  DCHECK(sock != nullptr);
+  DCHECK(recv_buf != nullptr);
+  DCHECK(header != nullptr);
+  DCHECK(param_buf != nullptr);
+
+  RETURN_NOT_OK(CheckInBlockingMode(sock));
+
+  // Read the message prefix, which specifies the length of the payload.
+  recv_buf->clear();
+  recv_buf->resize(kMsgLengthPrefixLength);
+  size_t recvd = 0;
+  RETURN_NOT_OK(sock->BlockingRecv(recv_buf->data(), kMsgLengthPrefixLength, &recvd, deadline));
+  uint32_t payload_len = NetworkByteOrder::Load32(recv_buf->data());
+
+  // Verify that the payload size isn't out of bounds.
+  // This can happen because of network corruption, or a naughty client.
+  if (PREDICT_FALSE(payload_len > FLAGS_rpc_max_message_size)) {
+    // A common user mistake is to try to speak the Kudu RPC protocol to an
+    // HTTP endpoint, or vice versa.
+    if (memcmp(recv_buf->data(), kHTTPHeader, strlen(kHTTPHeader)) == 0) {
+      return Status::IOError(
+          "received invalid RPC message which appears to be an HTTP response. "
+          "Verify that you have specified a valid RPC port and not an HTTP port.");
+    }
+
+    return Status::IOError(
+        strings::Substitute(
+            "received invalid message of size $0 which exceeds"
+            " the rpc_max_message_size of $1 bytes",
+            payload_len, FLAGS_rpc_max_message_size));
+  }
+
+  // Read the message payload.
+  recvd = 0;
+  recv_buf->resize(payload_len + kMsgLengthPrefixLength);
+  RETURN_NOT_OK(sock->BlockingRecv(recv_buf->data() + kMsgLengthPrefixLength,
+                payload_len, &recvd, deadline));
+  RETURN_NOT_OK(serialization::ParseMessage(Slice(*recv_buf), header, param_buf));
+  return Status::OK();
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/blocking_ops.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/blocking_ops.h b/be/src/kudu/rpc/blocking_ops.h
new file mode 100644
index 0000000..b305ba7
--- /dev/null
+++ b/be/src/kudu/rpc/blocking_ops.h
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_RPC_BLOCKING_OPS_H
+#define KUDU_RPC_BLOCKING_OPS_H
+
+namespace google {
+namespace protobuf {
+class MessageLite;
+} // namespace protobuf
+} // namespace google
+
+namespace kudu {
+
+class faststring;
+class MonoTime;
+class Slice;
+class Socket;
+class Status;
+
+namespace rpc {
+
+// Returns OK if socket is in blocking mode. Otherwise, returns an error.
+Status CheckInBlockingMode(const Socket* sock);
+
+// Encode and send a message over a socket.
+// header: Request or Response header protobuf.
+// msg: Protobuf message to send. This message must be fully initialized.
+// deadline: Latest time allowed for receive to complete before timeout.
+Status SendFramedMessageBlocking(Socket* sock, const google::protobuf::MessageLite& header,
+    const google::protobuf::MessageLite& msg, const MonoTime& deadline);
+
+// Receive a full message frame from the server.
+// recv_buf: buffer to use for reading the data from the socket.
+// header: Request or Response header protobuf.
+// param_buf: Slice into recv_buf containing unparsed RPC param protobuf data.
+// deadline: Latest time allowed for receive to complete before timeout.
+Status ReceiveFramedMessageBlocking(Socket* sock, faststring* recv_buf,
+    google::protobuf::MessageLite* header, Slice* param_buf, const MonoTime& deadline);
+
+} // namespace rpc
+} // namespace kudu
+
+#endif  // KUDU_RPC_BLOCKING_OPS_H

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/client_negotiation.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/client_negotiation.cc b/be/src/kudu/rpc/client_negotiation.cc
new file mode 100644
index 0000000..02175f6
--- /dev/null
+++ b/be/src/kudu/rpc/client_negotiation.cc
@@ -0,0 +1,853 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/client_negotiation.h"
+
+#include <cstdint>
+#include <cstring>
+#include <map>
+#include <memory>
+#include <ostream>
+#include <set>
+#include <string>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gssapi/gssapi.h>
+#include <gssapi/gssapi_krb5.h>
+#include <sasl/sasl.h>
+
+#include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/blocking_ops.h"
+#include "kudu/rpc/constants.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/sasl_common.h"
+#include "kudu/rpc/sasl_helper.h"
+#include "kudu/rpc/serialization.h"
+#include "kudu/security/cert.h"
+#include "kudu/security/tls_context.h"
+#include "kudu/security/tls_handshake.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/trace.h"
+
+using std::map;
+using std::set;
+using std::string;
+using std::unique_ptr;
+
+using strings::Substitute;
+
+DECLARE_bool(rpc_encrypt_loopback_connections);
+
+namespace kudu {
+namespace rpc {
+
+static int ClientNegotiationGetoptCb(ClientNegotiation* client_negotiation,
+                                     const char* plugin_name,
+                                     const char* option,
+                                     const char** result,
+                                     unsigned* len) {
+  return client_negotiation->GetOptionCb(plugin_name, option, result, len);
+}
+
+static int ClientNegotiationSimpleCb(ClientNegotiation* client_negotiation,
+                                     int id,
+                                     const char** result,
+                                     unsigned* len) {
+  return client_negotiation->SimpleCb(id, result, len);
+}
+
+static int ClientNegotiationSecretCb(sasl_conn_t* conn,
+                                     ClientNegotiation* client_negotiation,
+                                     int id,
+                                     sasl_secret_t** psecret) {
+  return client_negotiation->SecretCb(conn, id, psecret);
+}
+
+// Return an appropriately-typed Status object based on an ErrorStatusPB returned
+// from an Error RPC.
+// In case there is no relevant Status type, return a RuntimeError.
+static Status StatusFromRpcError(const ErrorStatusPB& error) {
+  DCHECK(error.IsInitialized()) << "Error status PB must be initialized";
+  if (PREDICT_FALSE(!error.has_code())) {
+    return Status::RuntimeError(error.message());
+  }
+  const string code_name = ErrorStatusPB::RpcErrorCodePB_Name(error.code());
+  switch (error.code()) {
+    case ErrorStatusPB_RpcErrorCodePB_FATAL_UNAUTHORIZED: // fall-through
+    case ErrorStatusPB_RpcErrorCodePB_FATAL_INVALID_AUTHENTICATION_TOKEN:
+      return Status::NotAuthorized(code_name, error.message());
+    case ErrorStatusPB_RpcErrorCodePB_ERROR_UNAVAILABLE:
+      return Status::ServiceUnavailable(code_name, error.message());
+    default:
+      return Status::RuntimeError(code_name, error.message());
+  }
+}
+
+ClientNegotiation::ClientNegotiation(unique_ptr<Socket> socket,
+                                     const security::TlsContext* tls_context,
+                                     boost::optional<security::SignedTokenPB> authn_token,
+                                     RpcEncryption encryption,
+                                     std::string sasl_proto_name)
+    : socket_(std::move(socket)),
+      helper_(SaslHelper::CLIENT),
+      tls_context_(tls_context),
+      encryption_(encryption),
+      tls_negotiated_(false),
+      authn_token_(std::move(authn_token)),
+      psecret_(nullptr, std::free),
+      negotiated_authn_(AuthenticationType::INVALID),
+      negotiated_mech_(SaslMechanism::INVALID),
+      sasl_proto_name_(std::move(sasl_proto_name)),
+      deadline_(MonoTime::Max()) {
+  callbacks_.push_back(SaslBuildCallback(SASL_CB_GETOPT,
+      reinterpret_cast<int (*)()>(&ClientNegotiationGetoptCb), this));
+  callbacks_.push_back(SaslBuildCallback(SASL_CB_AUTHNAME,
+      reinterpret_cast<int (*)()>(&ClientNegotiationSimpleCb), this));
+  callbacks_.push_back(SaslBuildCallback(SASL_CB_PASS,
+      reinterpret_cast<int (*)()>(&ClientNegotiationSecretCb), this));
+  callbacks_.push_back(SaslBuildCallback(SASL_CB_LIST_END, nullptr, nullptr));
+  DCHECK(socket_);
+  DCHECK(tls_context_);
+}
+
+Status ClientNegotiation::EnablePlain(const string& user, const string& pass) {
+  RETURN_NOT_OK(helper_.EnablePlain());
+  plain_auth_user_ = user;
+  plain_pass_ = pass;
+  return Status::OK();
+}
+
+Status ClientNegotiation::EnableGSSAPI() {
+  return helper_.EnableGSSAPI();
+}
+
+SaslMechanism::Type ClientNegotiation::negotiated_mechanism() const {
+  return negotiated_mech_;
+}
+
+void ClientNegotiation::set_server_fqdn(const string& domain_name) {
+  helper_.set_server_fqdn(domain_name);
+}
+
+void ClientNegotiation::set_deadline(const MonoTime& deadline) {
+  deadline_ = deadline;
+}
+
+Status ClientNegotiation::Negotiate(unique_ptr<ErrorStatusPB>* rpc_error) {
+  TRACE("Beginning negotiation");
+
+  // Ensure we can use blocking calls on the socket during negotiation.
+  RETURN_NOT_OK(CheckInBlockingMode(socket_.get()));
+
+  // Step 1: send the connection header.
+  RETURN_NOT_OK(SendConnectionHeader());
+
+  faststring recv_buf;
+
+  { // Step 2: send and receive the NEGOTIATE step messages.
+    RETURN_NOT_OK(SendNegotiate());
+    NegotiatePB response;
+    RETURN_NOT_OK(RecvNegotiatePB(&response, &recv_buf, rpc_error));
+    RETURN_NOT_OK(HandleNegotiate(response));
+    TRACE("Negotiated authn=$0", AuthenticationTypeToString(negotiated_authn_));
+  }
+
+  // Step 3: if both ends support TLS, do a TLS handshake.
+  // TODO(KUDU-1921): allow the client to require TLS.
+  if (encryption_ != RpcEncryption::DISABLED &&
+      ContainsKey(server_features_, TLS)) {
+    RETURN_NOT_OK(tls_context_->InitiateHandshake(security::TlsHandshakeType::CLIENT,
+                                                  &tls_handshake_));
+
+    if (negotiated_authn_ == AuthenticationType::SASL) {
+      // When using SASL authentication, verifying the server's certificate is
+      // not necessary. This allows the client to still use TLS encryption for
+      // connections to servers which only have a self-signed certificate.
+      tls_handshake_.set_verification_mode(security::TlsVerificationMode::VERIFY_NONE);
+    }
+
+    // To initiate the TLS handshake, we pretend as if the server sent us an
+    // empty TLS_HANDSHAKE token.
+    NegotiatePB initial;
+    initial.set_step(NegotiatePB::TLS_HANDSHAKE);
+    initial.set_tls_handshake("");
+    Status s = HandleTlsHandshake(initial);
+
+    while (s.IsIncomplete()) {
+      NegotiatePB response;
+      RETURN_NOT_OK(RecvNegotiatePB(&response, &recv_buf, rpc_error));
+      s = HandleTlsHandshake(response);
+    }
+    RETURN_NOT_OK(s);
+    tls_negotiated_ = true;
+  }
+
+  // Step 4: Authentication
+  switch (negotiated_authn_) {
+    case AuthenticationType::SASL:
+      RETURN_NOT_OK(AuthenticateBySasl(&recv_buf, rpc_error));
+      break;
+    case AuthenticationType::TOKEN:
+      RETURN_NOT_OK(AuthenticateByToken(&recv_buf, rpc_error));
+      break;
+    case AuthenticationType::CERTIFICATE:
+      // The TLS handshake has already authenticated the server.
+      break;
+    case AuthenticationType::INVALID: LOG(FATAL) << "unreachable";
+  }
+
+  // Step 5: Send connection context.
+  RETURN_NOT_OK(SendConnectionContext());
+
+  TRACE("Negotiation successful");
+  return Status::OK();
+}
+
+Status ClientNegotiation::SendNegotiatePB(const NegotiatePB& msg) {
+  RequestHeader header;
+  header.set_call_id(kNegotiateCallId);
+
+  DCHECK(socket_);
+  DCHECK(msg.IsInitialized()) << "message must be initialized";
+  DCHECK(msg.has_step()) << "message must have a step";
+
+  TRACE("Sending $0 NegotiatePB request", NegotiatePB::NegotiateStep_Name(msg.step()));
+  return SendFramedMessageBlocking(socket(), header, msg, deadline_);
+}
+
+Status ClientNegotiation::RecvNegotiatePB(NegotiatePB* msg,
+                                          faststring* buffer,
+                                          unique_ptr<ErrorStatusPB>* rpc_error) {
+  ResponseHeader header;
+  Slice param_buf;
+  RETURN_NOT_OK(ReceiveFramedMessageBlocking(socket(), buffer, &header, &param_buf, deadline_));
+  RETURN_NOT_OK(helper_.CheckNegotiateCallId(header.call_id()));
+
+  if (header.is_error()) {
+    return ParseError(param_buf, rpc_error);
+  }
+
+  RETURN_NOT_OK(helper_.ParseNegotiatePB(param_buf, msg));
+  TRACE("Received $0 NegotiatePB response", NegotiatePB::NegotiateStep_Name(msg->step()));
+  return Status::OK();
+}
+
+Status ClientNegotiation::ParseError(const Slice& err_data,
+                                     unique_ptr<ErrorStatusPB>* rpc_error) {
+  unique_ptr<ErrorStatusPB> error(new ErrorStatusPB);
+  if (!error->ParseFromArray(err_data.data(), err_data.size())) {
+    return Status::IOError("invalid error response, missing fields",
+                           error->InitializationErrorString());
+  }
+  Status s = StatusFromRpcError(*error);
+  TRACE("Received error response from server: $0", s.ToString());
+
+  if (rpc_error) {
+    rpc_error->swap(error);
+  }
+  return s;
+}
+
+Status ClientNegotiation::SendConnectionHeader() {
+  const uint8_t buflen = kMagicNumberLength + kHeaderFlagsLength;
+  uint8_t buf[buflen];
+  serialization::SerializeConnHeader(buf);
+  size_t nsent;
+  return socket()->BlockingWrite(buf, buflen, &nsent, deadline_);
+}
+
+Status ClientNegotiation::InitSaslClient() {
+  // TODO(KUDU-1922): consider setting SASL_SUCCESS_DATA
+  unsigned flags = 0;
+
+  sasl_conn_t* sasl_conn = nullptr;
+  RETURN_NOT_OK_PREPEND(WrapSaslCall(nullptr /* no conn */, [&]() {
+      return sasl_client_new(
+          sasl_proto_name_.c_str(),     // Registered name of the service using SASL. Required.
+          helper_.server_fqdn(),        // The fully qualified domain name of the remote server.
+          nullptr,                      // Local and remote IP address strings. (we don't use
+          nullptr,                      // any mechanisms which require this info.)
+          &callbacks_[0],               // Connection-specific callbacks.
+          flags,
+          &sasl_conn);
+    }), Substitute("unable to create new SASL $0 client", sasl_proto_name_));
+  sasl_conn_.reset(sasl_conn);
+  return Status::OK();
+}
+
+Status ClientNegotiation::SendNegotiate() {
+  NegotiatePB msg;
+  msg.set_step(NegotiatePB::NEGOTIATE);
+
+  // Advertise our supported features.
+  client_features_ = kSupportedClientRpcFeatureFlags;
+
+  if (encryption_ != RpcEncryption::DISABLED) {
+    client_features_.insert(TLS);
+    // If the remote peer is local, then we allow using TLS for authentication
+    // without encryption or integrity.
+    if (socket_->IsLoopbackConnection() && !FLAGS_rpc_encrypt_loopback_connections) {
+      client_features_.insert(TLS_AUTHENTICATION_ONLY);
+    }
+  }
+
+  for (RpcFeatureFlag feature : client_features_) {
+    msg.add_supported_features(feature);
+  }
+
+  if (!helper_.EnabledMechs().empty()) {
+    msg.add_authn_types()->mutable_sasl();
+  }
+  if (tls_context_->has_signed_cert() && !tls_context_->is_external_cert()) {
+    // We only provide authenticated TLS if the certificates are generated
+    // by the internal CA.
+    msg.add_authn_types()->mutable_certificate();
+  }
+  if (authn_token_ && tls_context_->has_trusted_cert()) {
+    // TODO(KUDU-1924): check that the authn token is not expired. Can this be done
+    // reliably on clients?
+    msg.add_authn_types()->mutable_token();
+  }
+
+  if (PREDICT_FALSE(msg.authn_types().empty())) {
+    return Status::NotAuthorized("client is not configured with an authentication type");
+  }
+
+  RETURN_NOT_OK(SendNegotiatePB(msg));
+  return Status::OK();
+}
+
+Status ClientNegotiation::HandleNegotiate(const NegotiatePB& response) {
+  if (PREDICT_FALSE(response.step() != NegotiatePB::NEGOTIATE)) {
+    return Status::NotAuthorized("expected NEGOTIATE step",
+                                 NegotiatePB::NegotiateStep_Name(response.step()));
+  }
+  TRACE("Received NEGOTIATE response from server");
+
+  // Fill in the set of features supported by the server.
+  for (int flag : response.supported_features()) {
+    // We only add the features that our local build knows about.
+    RpcFeatureFlag feature_flag = RpcFeatureFlag_IsValid(flag) ?
+                                  static_cast<RpcFeatureFlag>(flag) : UNKNOWN;
+    if (feature_flag != UNKNOWN) {
+      server_features_.insert(feature_flag);
+    }
+  }
+
+  if (encryption_ == RpcEncryption::REQUIRED &&
+      !ContainsKey(server_features_, RpcFeatureFlag::TLS)) {
+    return Status::NotAuthorized("server does not support required TLS encryption");
+  }
+
+  // Get the authentication type which the server would like to use.
+  DCHECK_LE(response.authn_types().size(), 1);
+  if (response.authn_types().empty()) {
+    // If the server doesn't send back an authentication type, default to SASL
+    // in order to maintain backwards compatibility.
+    negotiated_authn_ = AuthenticationType::SASL;
+  } else {
+    const auto& authn_type = response.authn_types(0);
+    switch (authn_type.type_case()) {
+      case AuthenticationTypePB::kSasl:
+        negotiated_authn_ = AuthenticationType::SASL;
+        break;
+      case AuthenticationTypePB::kToken:
+        // TODO(todd): we should also be checking tls_context_->has_trusted_cert()
+        // here to match the original logic we used to advertise TOKEN support,
+        // or perhaps just check explicitly whether we advertised TOKEN.
+        if (!authn_token_) {
+          return Status::RuntimeError(
+              "server chose token authentication, but client has no token");
+        }
+        negotiated_authn_ = AuthenticationType::TOKEN;
+        return Status::OK();
+      case AuthenticationTypePB::kCertificate:
+        if (!tls_context_->has_signed_cert()) {
+          return Status::RuntimeError(
+              "server chose certificate authentication, but client has no certificate");
+        }
+        negotiated_authn_ = AuthenticationType::CERTIFICATE;
+        return Status::OK();
+      case AuthenticationTypePB::TYPE_NOT_SET:
+        return Status::RuntimeError("server chose an unknown authentication type");
+    }
+  }
+
+  DCHECK_EQ(negotiated_authn_, AuthenticationType::SASL);
+
+  // Build a map of the SASL mechanisms offered by the server.
+  set<SaslMechanism::Type> client_mechs(helper_.EnabledMechs());
+  set<SaslMechanism::Type> server_mechs;
+  for (const NegotiatePB::SaslMechanism& sasl_mech : response.sasl_mechanisms()) {
+    auto mech = SaslMechanism::value_of(sasl_mech.mechanism());
+    if (mech == SaslMechanism::INVALID) {
+      continue;
+    }
+    server_mechs.insert(mech);
+  }
+
+  // Determine which SASL mechanism to use for authenticating the connection.
+  // We pick the most preferred mechanism which is supported by both parties.
+  // The preference list in order of most to least preferred:
+  //  * GSSAPI
+  //  * PLAIN
+  //
+  // TODO(KUDU-1921): allow the client to require authentication.
+  if (ContainsKey(client_mechs, SaslMechanism::GSSAPI) &&
+      ContainsKey(server_mechs, SaslMechanism::GSSAPI)) {
+
+    // Check that the client has local Kerberos credentials, and if not fall
+    // back to an alternate mechanism.
+    Status s = CheckGSSAPI();
+    if (s.ok()) {
+      negotiated_mech_ = SaslMechanism::GSSAPI;
+      return Status::OK();
+    }
+
+    TRACE("Kerberos authentication credentials are not available: $0", s.ToString());
+    client_mechs.erase(SaslMechanism::GSSAPI);
+  }
+
+  if (ContainsKey(client_mechs, SaslMechanism::PLAIN) &&
+      ContainsKey(server_mechs, SaslMechanism::PLAIN)) {
+    negotiated_mech_ = SaslMechanism::PLAIN;
+    return Status::OK();
+  }
+
+  // There are no mechanisms in common.
+  if (ContainsKey(server_mechs, SaslMechanism::GSSAPI) &&
+      !ContainsKey(client_mechs, SaslMechanism::GSSAPI)) {
+    return Status::NotAuthorized("server requires authentication, "
+                                  "but client does not have Kerberos credentials available");
+  }
+  if (!ContainsKey(server_mechs, SaslMechanism::GSSAPI) &&
+      ContainsKey(client_mechs, SaslMechanism::GSSAPI)) {
+    return Status::NotAuthorized("client requires authentication, "
+                                  "but server does not have Kerberos enabled");
+  }
+  string msg = Substitute("client/server supported SASL mechanism mismatch; "
+                          "client mechanisms: [$0], server mechanisms: [$1]",
+                          JoinMapped(client_mechs, SaslMechanism::name_of, ", "),
+                          JoinMapped(server_mechs, SaslMechanism::name_of, ", "));
+
+  // For now, there should never be a SASL mechanism mismatch that isn't due
+  // to one of the sides requiring Kerberos and the other not having it, so
+  // lets sanity check that.
+  DCHECK(STLSetIntersection(client_mechs, server_mechs).empty()) << msg;
+  return Status::NotAuthorized(msg);
+}
+
+Status ClientNegotiation::SendTlsHandshake(string tls_token) {
+  TRACE("Sending TLS_HANDSHAKE message to server");
+  NegotiatePB msg;
+  msg.set_step(NegotiatePB::TLS_HANDSHAKE);
+  msg.mutable_tls_handshake()->swap(tls_token);
+  return SendNegotiatePB(msg);
+}
+
+Status ClientNegotiation::HandleTlsHandshake(const NegotiatePB& response) {
+  if (PREDICT_FALSE(response.step() != NegotiatePB::TLS_HANDSHAKE)) {
+    return Status::NotAuthorized("expected TLS_HANDSHAKE step",
+                                 NegotiatePB::NegotiateStep_Name(response.step()));
+  }
+  TRACE("Received TLS_HANDSHAKE response from server");
+
+  if (PREDICT_FALSE(!response.has_tls_handshake())) {
+    return Status::NotAuthorized("No TLS handshake token in TLS_HANDSHAKE response from server");
+  }
+
+  string token;
+  Status s = tls_handshake_.Continue(response.tls_handshake(), &token);
+  if (s.IsIncomplete()) {
+    // Another roundtrip is required to complete the handshake.
+    RETURN_NOT_OK(SendTlsHandshake(std::move(token)));
+  }
+
+  // Check that the handshake step didn't produce an error. Will also propagate
+  // an Incomplete status.
+  RETURN_NOT_OK(s);
+
+  // TLS handshake is finished.
+  if (ContainsKey(server_features_, TLS_AUTHENTICATION_ONLY) &&
+      ContainsKey(client_features_, TLS_AUTHENTICATION_ONLY)) {
+    TRACE("Negotiated auth-only $0 with cipher $1",
+          tls_handshake_.GetProtocol(), tls_handshake_.GetCipherDescription());
+    return tls_handshake_.FinishNoWrap(*socket_);
+  }
+
+  TRACE("Negotiated $0 with cipher $1",
+        tls_handshake_.GetProtocol(), tls_handshake_.GetCipherDescription());
+  return tls_handshake_.Finish(&socket_);
+}
+
+Status ClientNegotiation::AuthenticateBySasl(faststring* recv_buf,
+                                             unique_ptr<ErrorStatusPB>* rpc_error) {
+  RETURN_NOT_OK(InitSaslClient());
+  Status s = SendSaslInitiate();
+
+  // HandleSasl[Initiate, Challenge] return incomplete if an additional
+  // challenge step is required, or OK if a SASL_SUCCESS message is expected.
+  while (s.IsIncomplete()) {
+    NegotiatePB challenge;
+    RETURN_NOT_OK(RecvNegotiatePB(&challenge, recv_buf, rpc_error));
+    s = HandleSaslChallenge(challenge);
+  }
+
+  // Propagate failure from SendSaslInitiate or HandleSaslChallenge.
+  RETURN_NOT_OK(s);
+
+  // Server challenges are over; we now expect the success message.
+  NegotiatePB success;
+  RETURN_NOT_OK(RecvNegotiatePB(&success, recv_buf, rpc_error));
+  return HandleSaslSuccess(success);
+}
+
+Status ClientNegotiation::AuthenticateByToken(faststring* recv_buf,
+                                              unique_ptr<ErrorStatusPB>* rpc_error) {
+  // Sanity check that TLS has been negotiated. Sending the token on an
+  // unencrypted channel is a big no-no.
+  CHECK(tls_negotiated_);
+
+  // Send the token to the server.
+  NegotiatePB pb;
+  pb.set_step(NegotiatePB::TOKEN_EXCHANGE);
+  *pb.mutable_authn_token() = std::move(*authn_token_);
+  RETURN_NOT_OK(SendNegotiatePB(pb));
+  pb.Clear();
+
+  // Check that the server responds with a non-error TOKEN_EXCHANGE message.
+  RETURN_NOT_OK(RecvNegotiatePB(&pb, recv_buf, rpc_error));
+  if (pb.step() != NegotiatePB::TOKEN_EXCHANGE) {
+    return Status::NotAuthorized("expected TOKEN_EXCHANGE step",
+                                 NegotiatePB::NegotiateStep_Name(pb.step()));
+  }
+
+  return Status::OK();
+}
+
+Status ClientNegotiation::SendSaslInitiate() {
+  TRACE("Initiating SASL $0 handshake", SaslMechanism::name_of(negotiated_mech_));
+
+  // At this point we've already chosen the SASL mechanism to use
+  // (negotiated_mech_), but we need to let the SASL library know. SASL likes to
+  // choose the mechanism from among a list of possible options, so we simply
+  // provide it one option, and then check that it picks that option.
+
+  const char* init_msg = nullptr;
+  unsigned init_msg_len = 0;
+  const char* negotiated_mech = nullptr;
+
+  /* select a mechanism for a connection
+   *  mechlist      -- mechanisms server has available (punctuation ignored)
+   * output:
+   *  prompt_need   -- on SASL_INTERACT, list of prompts needed to continue
+   *  clientout     -- the initial client response to send to the server
+   *  mech          -- set to mechanism name
+   *
+   * Returns:
+   *  SASL_OK       -- success
+   *  SASL_CONTINUE -- negotiation required
+   *  SASL_NOMEM    -- not enough memory
+   *  SASL_NOMECH   -- no mechanism meets requested properties
+   *  SASL_INTERACT -- user interaction needed to fill in prompt_need list
+   */
+  TRACE("Calling sasl_client_start()");
+  const Status s = WrapSaslCall(sasl_conn_.get(), [&]() {
+      return sasl_client_start(
+          sasl_conn_.get(),                         // The SASL connection context created by init()
+          SaslMechanism::name_of(negotiated_mech_), // The list of mechanisms to negotiate.
+          nullptr,                                  // Disables INTERACT return if NULL.
+          &init_msg,                                // Filled in on success.
+          &init_msg_len,                            // Filled in on success.
+          &negotiated_mech);                        // Filled in on success.
+  });
+
+  if (PREDICT_FALSE(!s.IsIncomplete() && !s.ok())) {
+    return s;
+  }
+
+  // Check that the SASL library is using the mechanism that we picked.
+  DCHECK_EQ(SaslMechanism::value_of(negotiated_mech), negotiated_mech_);
+
+  // If the negotiated mechanism is GSSAPI (Kerberos), configure SASL to use
+  // integrity protection so that the channel bindings and nonce can be
+  // verified.
+  if (negotiated_mech_ == SaslMechanism::GSSAPI) {
+    RETURN_NOT_OK(EnableProtection(sasl_conn_.get(), SaslProtection::kIntegrity));
+  }
+
+  NegotiatePB msg;
+  msg.set_step(NegotiatePB::SASL_INITIATE);
+  msg.mutable_token()->assign(init_msg, init_msg_len);
+  msg.add_sasl_mechanisms()->set_mechanism(negotiated_mech);
+  RETURN_NOT_OK(SendNegotiatePB(msg));
+  return s;
+}
+
+Status ClientNegotiation::SendSaslResponse(const char* resp_msg, unsigned resp_msg_len) {
+  NegotiatePB reply;
+  reply.set_step(NegotiatePB::SASL_RESPONSE);
+  reply.mutable_token()->assign(resp_msg, resp_msg_len);
+  return SendNegotiatePB(reply);
+}
+
+Status ClientNegotiation::HandleSaslChallenge(const NegotiatePB& response) {
+  if (PREDICT_FALSE(response.step() != NegotiatePB::SASL_CHALLENGE)) {
+    return Status::NotAuthorized("expected SASL_CHALLENGE step",
+                                 NegotiatePB::NegotiateStep_Name(response.step()));
+  }
+  TRACE("Received SASL_CHALLENGE response from server");
+  if (PREDICT_FALSE(!response.has_token())) {
+    return Status::NotAuthorized("no token in SASL_CHALLENGE response from server");
+  }
+
+  const char* out = nullptr;
+  unsigned out_len = 0;
+  const Status s = DoSaslStep(response.token(), &out, &out_len);
+  if (PREDICT_FALSE(!s.IsIncomplete() && !s.ok())) {
+    return s;
+  }
+
+  RETURN_NOT_OK(SendSaslResponse(out, out_len));
+  return s;
+}
+
+Status ClientNegotiation::HandleSaslSuccess(const NegotiatePB& response) {
+  if (PREDICT_FALSE(response.step() != NegotiatePB::SASL_SUCCESS)) {
+    return Status::NotAuthorized("expected SASL_SUCCESS step",
+                                 NegotiatePB::NegotiateStep_Name(response.step()));
+  }
+  TRACE("Received SASL_SUCCESS response from server");
+
+  if (negotiated_mech_ == SaslMechanism::GSSAPI) {
+    if (response.has_nonce()) {
+      // Grab the nonce from the server, if it has sent one. We'll send it back
+      // later with SASL integrity protection as part of the connection context.
+      nonce_ = response.nonce();
+    }
+
+    if (tls_negotiated_) {
+      // Check the channel bindings provided by the server against the expected channel bindings.
+      if (!response.has_channel_bindings()) {
+        return Status::NotAuthorized("no channel bindings provided by server");
+      }
+
+      security::Cert cert;
+      RETURN_NOT_OK(tls_handshake_.GetRemoteCert(&cert));
+
+      string expected_channel_bindings;
+      RETURN_NOT_OK_PREPEND(cert.GetServerEndPointChannelBindings(&expected_channel_bindings),
+                            "failed to generate channel bindings");
+
+      Slice received_channel_bindings;
+      RETURN_NOT_OK_PREPEND(SaslDecode(sasl_conn_.get(),
+                                       response.channel_bindings(),
+                                       &received_channel_bindings),
+                            "failed to decode channel bindings");
+
+      if (expected_channel_bindings != received_channel_bindings) {
+        Sockaddr addr;
+        ignore_result(socket_->GetPeerAddress(&addr));
+
+        LOG(WARNING) << "Received invalid channel bindings from server "
+                    << addr.ToString()
+                    << ", this could indicate an active network man-in-the-middle";
+        return Status::NotAuthorized("channel bindings do not match");
+      }
+    }
+  }
+
+  return Status::OK();
+}
+
+Status ClientNegotiation::DoSaslStep(const string& in, const char** out, unsigned* out_len) {
+  TRACE("Calling sasl_client_step()");
+
+  return WrapSaslCall(sasl_conn_.get(), [&]() {
+      return sasl_client_step(sasl_conn_.get(), in.c_str(), in.length(), nullptr, out, out_len);
+  });
+}
+
+Status ClientNegotiation::SendConnectionContext() {
+  TRACE("Sending connection context");
+  RequestHeader header;
+  header.set_call_id(kConnectionContextCallId);
+
+  ConnectionContextPB conn_context;
+  // This field is deprecated but used by servers <Kudu 1.1. Newer server versions ignore
+  // this and use the SASL-provided username instead.
+  conn_context.mutable_deprecated_user_info()->set_real_user(
+      plain_auth_user_.empty() ? "cpp-client" : plain_auth_user_);
+
+  if (nonce_) {
+    // Reply with the SASL-protected nonce. We only set the nonce when using SASL GSSAPI.
+    Slice ciphertext;
+    RETURN_NOT_OK(SaslEncode(sasl_conn_.get(), *nonce_, &ciphertext));
+    *conn_context.mutable_encoded_nonce() = ciphertext.ToString();
+  }
+
+  return SendFramedMessageBlocking(socket(), header, conn_context, deadline_);
+}
+
+int ClientNegotiation::GetOptionCb(const char* plugin_name, const char* option,
+                            const char** result, unsigned* len) {
+  return helper_.GetOptionCb(plugin_name, option, result, len);
+}
+
+// Used for PLAIN.
+// SASL callback for SASL_CB_USER, SASL_CB_AUTHNAME, SASL_CB_LANGUAGE
+int ClientNegotiation::SimpleCb(int id, const char** result, unsigned* len) {
+  if (PREDICT_FALSE(!helper_.IsPlainEnabled())) {
+    LOG(DFATAL) << "Simple callback called, but PLAIN auth is not enabled";
+    return SASL_FAIL;
+  }
+  if (PREDICT_FALSE(result == nullptr)) {
+    LOG(DFATAL) << "result outparam is NULL";
+    return SASL_BADPARAM;
+  }
+  switch (id) {
+    // TODO(unknown): Support impersonation?
+    // For impersonation, USER is the impersonated user, AUTHNAME is the "sudoer".
+    case SASL_CB_USER:
+      TRACE("callback for SASL_CB_USER");
+      *result = plain_auth_user_.c_str();
+      if (len != nullptr) *len = plain_auth_user_.length();
+      break;
+    case SASL_CB_AUTHNAME:
+      TRACE("callback for SASL_CB_AUTHNAME");
+      *result = plain_auth_user_.c_str();
+      if (len != nullptr) *len = plain_auth_user_.length();
+      break;
+    case SASL_CB_LANGUAGE:
+      LOG(DFATAL) << "Unable to handle SASL callback type SASL_CB_LANGUAGE"
+        << "(" << id << ")";
+      return SASL_BADPARAM;
+    default:
+      LOG(DFATAL) << "Unexpected SASL callback type: " << id;
+      return SASL_BADPARAM;
+  }
+
+  return SASL_OK;
+}
+
+// Used for PLAIN.
+// SASL callback for SASL_CB_PASS: User password.
+int ClientNegotiation::SecretCb(sasl_conn_t* conn, int id, sasl_secret_t** psecret) {
+  if (PREDICT_FALSE(!helper_.IsPlainEnabled())) {
+    LOG(DFATAL) << "Plain secret callback called, but PLAIN auth is not enabled";
+    return SASL_FAIL;
+  }
+  switch (id) {
+    case SASL_CB_PASS: {
+      if (!conn || !psecret) return SASL_BADPARAM;
+
+      size_t len = plain_pass_.length();
+      *psecret = reinterpret_cast<sasl_secret_t*>(malloc(sizeof(sasl_secret_t) + len));
+      if (!*psecret) {
+        return SASL_NOMEM;
+      }
+      psecret_.reset(*psecret);  // Ensure that we free() this structure later.
+      (*psecret)->len = len;
+      memcpy((*psecret)->data, plain_pass_.c_str(), len + 1);
+      break;
+    }
+    default:
+      LOG(DFATAL) << "Unexpected SASL callback type: " << id;
+      return SASL_BADPARAM;
+  }
+
+  return SASL_OK;
+}
+
+namespace {
+// Retrieve the GSSAPI error description for an error code and type.
+string gss_error_description(OM_uint32 code, int type) {
+  string description;
+  OM_uint32 message_context = 0;
+
+  do {
+    if (!description.empty()) {
+      description.append(": ");
+    }
+    OM_uint32 minor = 0;
+    gss_buffer_desc buf;
+    gss_display_status(&minor, code, type, GSS_C_NULL_OID, &message_context, &buf);
+    description.append(static_cast<const char*>(buf.value), buf.length);
+    gss_release_buffer(&minor, &buf);
+  } while (message_context != 0);
+
+  return description;
+}
+
+// Transforms a GSSAPI major and minor error code into a Kudu Status.
+Status check_gss_error(OM_uint32 major, OM_uint32 minor) {
+    if (GSS_ERROR(major)) {
+      return Status::NotAuthorized(gss_error_description(major, GSS_C_GSS_CODE),
+                                   gss_error_description(minor, GSS_C_MECH_CODE));
+    }
+    return Status::OK();
+}
+} // anonymous namespace
+
+Status ClientNegotiation::CheckGSSAPI() {
+  OM_uint32 major, minor;
+  gss_cred_id_t cred = GSS_C_NO_CREDENTIAL;
+
+  // Acquire the Kerberos credential. This will fail if the client does not have
+  // a Kerberos tgt ticket. In theory it should be sufficient to call
+  // gss_inquire_cred_by_mech, but that causes a memory leak on RHEL 7.
+  major = gss_acquire_cred(&minor,
+                           GSS_C_NO_NAME,
+                           GSS_C_INDEFINITE,
+                           const_cast<gss_OID_set>(gss_mech_set_krb5),
+                           GSS_C_INITIATE,
+                           &cred,
+                           nullptr,
+                           nullptr);
+  Status s = check_gss_error(major, minor);
+
+  // Inspect the Kerberos credential to determine if it is expired. The lifetime
+  // returned from gss_acquire_cred in the RHEL 6 version of krb5 is always 0,
+  // so it has to be done with a separate call to gss_inquire_cred. The lifetime
+  // holds the remaining validity of the tgt in seconds.
+  OM_uint32 lifetime;
+  if (s.ok()) {
+    major = gss_inquire_cred(&minor, cred, nullptr, &lifetime, nullptr, nullptr);
+    s = check_gss_error(major, minor);
+  }
+
+  // Release the credential even if gss_inquire_cred fails.
+  gss_release_cred(&minor, &cred);
+  RETURN_NOT_OK(s);
+
+  if (lifetime == 0) {
+    return Status::NotAuthorized("Kerberos ticket expired");
+  }
+  return Status::OK();
+}
+
+} // namespace rpc
+} // namespace kudu