You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:58 UTC
[46/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from
kudu@334ecafd
IMPALA-7006: Add KRPC folders from kudu@334ecafd
cp -a ~/checkout/kudu/src/kudu/{rpc,util,security} be/src/kudu/
Change-Id: I232db2b4ccf5df9aca87b21dea31bfb2735d1ab7
Reviewed-on: http://gerrit.cloudera.org:8080/10757
Reviewed-by: Lars Volker <lv...@cloudera.com>
Tested-by: Lars Volker <lv...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/fcf190c4
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/fcf190c4
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/fcf190c4
Branch: refs/heads/master
Commit: fcf190c4de1fcc291a5356634fd7cd12efa64852
Parents: 39870d4
Author: Lars Volker <lv...@cloudera.com>
Authored: Tue Jul 3 15:10:52 2018 -0700
Committer: Lars Volker <lv...@cloudera.com>
Committed: Thu Jul 12 21:35:42 2018 +0000
----------------------------------------------------------------------
be/src/kudu/rpc/CMakeLists.txt | 138 +
be/src/kudu/rpc/acceptor_pool.cc | 175 ++
be/src/kudu/rpc/acceptor_pool.h | 84 +
be/src/kudu/rpc/blocking_ops.cc | 126 +
be/src/kudu/rpc/blocking_ops.h | 58 +
be/src/kudu/rpc/client_negotiation.cc | 853 ++++++
be/src/kudu/rpc/client_negotiation.h | 263 ++
be/src/kudu/rpc/connection.cc | 767 ++++++
be/src/kudu/rpc/connection.h | 391 +++
be/src/kudu/rpc/connection_id.cc | 85 +
be/src/kudu/rpc/connection_id.h | 84 +
be/src/kudu/rpc/constants.cc | 37 +
be/src/kudu/rpc/constants.h | 60 +
be/src/kudu/rpc/exactly_once_rpc-test.cc | 629 +++++
be/src/kudu/rpc/inbound_call.cc | 345 +++
be/src/kudu/rpc/inbound_call.h | 286 ++
be/src/kudu/rpc/messenger.cc | 502 ++++
be/src/kudu/rpc/messenger.h | 460 ++++
be/src/kudu/rpc/mt-rpc-test.cc | 318 +++
be/src/kudu/rpc/negotiation-test.cc | 1346 ++++++++++
be/src/kudu/rpc/negotiation.cc | 327 +++
be/src/kudu/rpc/negotiation.h | 58 +
be/src/kudu/rpc/outbound_call.cc | 531 ++++
be/src/kudu/rpc/outbound_call.h | 348 +++
be/src/kudu/rpc/periodic-test.cc | 295 +++
be/src/kudu/rpc/periodic.cc | 219 ++
be/src/kudu/rpc/periodic.h | 215 ++
be/src/kudu/rpc/protoc-gen-krpc.cc | 691 +++++
be/src/kudu/rpc/proxy.cc | 116 +
be/src/kudu/rpc/proxy.h | 126 +
be/src/kudu/rpc/reactor-test.cc | 112 +
be/src/kudu/rpc/reactor.cc | 918 +++++++
be/src/kudu/rpc/reactor.h | 427 +++
be/src/kudu/rpc/remote_method.cc | 53 +
be/src/kudu/rpc/remote_method.h | 51 +
be/src/kudu/rpc/remote_user.cc | 40 +
be/src/kudu/rpc/remote_user.h | 99 +
be/src/kudu/rpc/request_tracker-test.cc | 86 +
be/src/kudu/rpc/request_tracker.cc | 55 +
be/src/kudu/rpc/request_tracker.h | 87 +
be/src/kudu/rpc/response_callback.h | 31 +
be/src/kudu/rpc/result_tracker.cc | 595 +++++
be/src/kudu/rpc/result_tracker.h | 401 +++
be/src/kudu/rpc/retriable_rpc.h | 296 +++
be/src/kudu/rpc/rpc-bench.cc | 298 +++
be/src/kudu/rpc/rpc-test-base.h | 661 +++++
be/src/kudu/rpc/rpc-test.cc | 1364 ++++++++++
be/src/kudu/rpc/rpc.cc | 101 +
be/src/kudu/rpc/rpc.h | 221 ++
be/src/kudu/rpc/rpc_context.cc | 217 ++
be/src/kudu/rpc/rpc_context.h | 245 ++
be/src/kudu/rpc/rpc_controller.cc | 177 ++
be/src/kudu/rpc/rpc_controller.h | 282 ++
be/src/kudu/rpc/rpc_header.proto | 365 +++
be/src/kudu/rpc/rpc_introspection.proto | 110 +
be/src/kudu/rpc/rpc_service.h | 47 +
be/src/kudu/rpc/rpc_sidecar.cc | 115 +
be/src/kudu/rpc/rpc_sidecar.h | 73 +
be/src/kudu/rpc/rpc_stub-test.cc | 726 ++++++
be/src/kudu/rpc/rpcz_store.cc | 272 ++
be/src/kudu/rpc/rpcz_store.h | 74 +
be/src/kudu/rpc/rtest.proto | 160 ++
be/src/kudu/rpc/rtest_diff_package.proto | 26 +
be/src/kudu/rpc/sasl_common.cc | 470 ++++
be/src/kudu/rpc/sasl_common.h | 158 ++
be/src/kudu/rpc/sasl_helper.cc | 134 +
be/src/kudu/rpc/sasl_helper.h | 109 +
be/src/kudu/rpc/serialization.cc | 223 ++
be/src/kudu/rpc/serialization.h | 88 +
be/src/kudu/rpc/server_negotiation.cc | 989 +++++++
be/src/kudu/rpc/server_negotiation.h | 259 ++
be/src/kudu/rpc/service_if.cc | 160 ++
be/src/kudu/rpc/service_if.h | 134 +
be/src/kudu/rpc/service_pool.cc | 234 ++
be/src/kudu/rpc/service_pool.h | 117 +
be/src/kudu/rpc/service_queue-test.cc | 151 ++
be/src/kudu/rpc/service_queue.cc | 145 ++
be/src/kudu/rpc/service_queue.h | 225 ++
be/src/kudu/rpc/transfer.cc | 283 ++
be/src/kudu/rpc/transfer.h | 212 ++
be/src/kudu/rpc/user_credentials.cc | 64 +
be/src/kudu/rpc/user_credentials.h | 53 +
be/src/kudu/security/CMakeLists.txt | 141 +
be/src/kudu/security/ca/cert_management-test.cc | 294 +++
be/src/kudu/security/ca/cert_management.cc | 423 +++
be/src/kudu/security/ca/cert_management.h | 226 ++
be/src/kudu/security/cert-test.cc | 165 ++
be/src/kudu/security/cert.cc | 301 +++
be/src/kudu/security/cert.h | 119 +
be/src/kudu/security/crypto-test.cc | 257 ++
be/src/kudu/security/crypto.cc | 276 ++
be/src/kudu/security/crypto.h | 103 +
be/src/kudu/security/init.cc | 465 ++++
be/src/kudu/security/init.h | 84 +
be/src/kudu/security/kerberos_util.cc | 37 +
be/src/kudu/security/kerberos_util.h | 29 +
be/src/kudu/security/krb5_realm_override.cc | 105 +
be/src/kudu/security/openssl_util.cc | 322 +++
be/src/kudu/security/openssl_util.h | 217 ++
be/src/kudu/security/openssl_util_bio.h | 129 +
be/src/kudu/security/security-test-util.cc | 103 +
be/src/kudu/security/security-test-util.h | 56 +
be/src/kudu/security/security_flags.cc | 42 +
be/src/kudu/security/security_flags.h | 36 +
be/src/kudu/security/simple_acl.cc | 89 +
be/src/kudu/security/simple_acl.h | 58 +
be/src/kudu/security/test/mini_kdc-test.cc | 144 ++
be/src/kudu/security/test/mini_kdc.cc | 315 +++
be/src/kudu/security/test/mini_kdc.h | 134 +
be/src/kudu/security/test/test_certs.cc | 969 +++++++
be/src/kudu/security/test/test_certs.h | 86 +
be/src/kudu/security/test/test_pass.cc | 40 +
be/src/kudu/security/test/test_pass.h | 37 +
be/src/kudu/security/tls_context.cc | 520 ++++
be/src/kudu/security/tls_context.h | 202 ++
be/src/kudu/security/tls_handshake-test.cc | 390 +++
be/src/kudu/security/tls_handshake.cc | 274 ++
be/src/kudu/security/tls_handshake.h | 171 ++
be/src/kudu/security/tls_socket-test.cc | 366 +++
be/src/kudu/security/tls_socket.cc | 185 ++
be/src/kudu/security/tls_socket.h | 60 +
be/src/kudu/security/token-test.cc | 677 +++++
be/src/kudu/security/token.proto | 97 +
be/src/kudu/security/token_signer.cc | 299 +++
be/src/kudu/security/token_signer.h | 316 +++
be/src/kudu/security/token_signing_key.cc | 110 +
be/src/kudu/security/token_signing_key.h | 103 +
be/src/kudu/security/token_verifier.cc | 173 ++
be/src/kudu/security/token_verifier.h | 126 +
be/src/kudu/security/x509_check_host.cc | 439 ++++
be/src/kudu/security/x509_check_host.h | 50 +
be/src/kudu/util/CMakeLists.txt | 482 ++++
be/src/kudu/util/alignment.h | 28 +
be/src/kudu/util/array_view.h | 133 +
be/src/kudu/util/async_logger.cc | 151 ++
be/src/kudu/util/async_logger.h | 206 ++
be/src/kudu/util/async_util-test.cc | 129 +
be/src/kudu/util/async_util.h | 99 +
be/src/kudu/util/atomic-test.cc | 135 +
be/src/kudu/util/atomic.cc | 56 +
be/src/kudu/util/atomic.h | 322 +++
be/src/kudu/util/auto_release_pool.h | 99 +
be/src/kudu/util/barrier.h | 68 +
be/src/kudu/util/bit-stream-utils.h | 150 ++
be/src/kudu/util/bit-stream-utils.inline.h | 211 ++
be/src/kudu/util/bit-util-test.cc | 45 +
be/src/kudu/util/bit-util.h | 57 +
be/src/kudu/util/bitmap-test.cc | 230 ++
be/src/kudu/util/bitmap.cc | 136 +
be/src/kudu/util/bitmap.h | 219 ++
be/src/kudu/util/blocking_queue-test.cc | 249 ++
be/src/kudu/util/blocking_queue.h | 256 ++
be/src/kudu/util/bloom_filter-test.cc | 92 +
be/src/kudu/util/bloom_filter.cc | 89 +
be/src/kudu/util/bloom_filter.h | 254 ++
be/src/kudu/util/boost_mutex_utils.h | 45 +
be/src/kudu/util/cache-bench.cc | 191 ++
be/src/kudu/util/cache-test.cc | 246 ++
be/src/kudu/util/cache.cc | 572 ++++
be/src/kudu/util/cache.h | 216 ++
be/src/kudu/util/cache_metrics.cc | 69 +
be/src/kudu/util/cache_metrics.h | 42 +
be/src/kudu/util/callback_bind-test.cc | 119 +
be/src/kudu/util/coding-inl.h | 120 +
be/src/kudu/util/coding.cc | 142 +
be/src/kudu/util/coding.h | 113 +
.../kudu/util/compression/compression-test.cc | 90 +
be/src/kudu/util/compression/compression.proto | 29 +
.../kudu/util/compression/compression_codec.cc | 286 ++
.../kudu/util/compression/compression_codec.h | 78 +
be/src/kudu/util/condition_variable.cc | 142 +
be/src/kudu/util/condition_variable.h | 118 +
be/src/kudu/util/countdown_latch-test.cc | 74 +
be/src/kudu/util/countdown_latch.h | 137 +
be/src/kudu/util/cow_object.cc | 34 +
be/src/kudu/util/cow_object.h | 437 ++++
be/src/kudu/util/crc-test.cc | 112 +
be/src/kudu/util/crc.cc | 56 +
be/src/kudu/util/crc.h | 43 +
be/src/kudu/util/curl_util.cc | 130 +
be/src/kudu/util/curl_util.h | 92 +
be/src/kudu/util/debug-util-test.cc | 458 ++++
be/src/kudu/util/debug-util.cc | 800 ++++++
be/src/kudu/util/debug-util.h | 321 +++
be/src/kudu/util/debug/leak_annotations.h | 84 +
be/src/kudu/util/debug/leakcheck_disabler.h | 48 +
be/src/kudu/util/debug/sanitizer_scopes.h | 47 +
be/src/kudu/util/debug/trace_event.h | 1501 +++++++++++
be/src/kudu/util/debug/trace_event_impl.cc | 2436 ++++++++++++++++++
be/src/kudu/util/debug/trace_event_impl.h | 726 ++++++
.../util/debug/trace_event_impl_constants.cc | 14 +
be/src/kudu/util/debug/trace_event_memory.h | 28 +
.../util/debug/trace_event_synthetic_delay.cc | 238 ++
.../util/debug/trace_event_synthetic_delay.h | 166 ++
be/src/kudu/util/debug/trace_logging.h | 132 +
be/src/kudu/util/debug/unwind_safeness.cc | 164 ++
be/src/kudu/util/debug/unwind_safeness.h | 29 +
be/src/kudu/util/debug_ref_counted.h | 56 +
be/src/kudu/util/decimal_util-test.cc | 81 +
be/src/kudu/util/decimal_util.cc | 89 +
be/src/kudu/util/decimal_util.h | 69 +
be/src/kudu/util/easy_json-test.cc | 106 +
be/src/kudu/util/easy_json.cc | 212 ++
be/src/kudu/util/easy_json.h | 190 ++
be/src/kudu/util/env-test.cc | 1173 +++++++++
be/src/kudu/util/env.cc | 93 +
be/src/kudu/util/env.h | 681 +++++
be/src/kudu/util/env_posix.cc | 1852 +++++++++++++
be/src/kudu/util/env_util-test.cc | 192 ++
be/src/kudu/util/env_util.cc | 320 +++
be/src/kudu/util/env_util.h | 112 +
be/src/kudu/util/errno-test.cc | 50 +
be/src/kudu/util/errno.cc | 52 +
be/src/kudu/util/errno.h | 36 +
be/src/kudu/util/faststring-test.cc | 65 +
be/src/kudu/util/faststring.cc | 72 +
be/src/kudu/util/faststring.h | 259 ++
be/src/kudu/util/fault_injection.cc | 78 +
be/src/kudu/util/fault_injection.h | 98 +
be/src/kudu/util/file_cache-stress-test.cc | 402 +++
be/src/kudu/util/file_cache-test-util.h | 92 +
be/src/kudu/util/file_cache-test.cc | 361 +++
be/src/kudu/util/file_cache.cc | 654 +++++
be/src/kudu/util/file_cache.h | 209 ++
be/src/kudu/util/flag_tags-test.cc | 135 +
be/src/kudu/util/flag_tags.cc | 91 +
be/src/kudu/util/flag_tags.h | 169 ++
be/src/kudu/util/flag_validators-test.cc | 252 ++
be/src/kudu/util/flag_validators.cc | 67 +
be/src/kudu/util/flag_validators.h | 102 +
be/src/kudu/util/flags-test.cc | 109 +
be/src/kudu/util/flags.cc | 604 +++++
be/src/kudu/util/flags.h | 89 +
be/src/kudu/util/group_varint-inl.h | 294 +++
be/src/kudu/util/group_varint-test.cc | 144 ++
be/src/kudu/util/group_varint.cc | 81 +
be/src/kudu/util/hash_util-test.cc | 42 +
be/src/kudu/util/hash_util.h | 71 +
be/src/kudu/util/hdr_histogram-test.cc | 116 +
be/src/kudu/util/hdr_histogram.cc | 501 ++++
be/src/kudu/util/hdr_histogram.h | 351 +++
be/src/kudu/util/hexdump.cc | 85 +
be/src/kudu/util/hexdump.h | 34 +
be/src/kudu/util/high_water_mark.h | 85 +
be/src/kudu/util/histogram.proto | 48 +
be/src/kudu/util/init.cc | 89 +
be/src/kudu/util/init.h | 33 +
be/src/kudu/util/inline_slice-test.cc | 88 +
be/src/kudu/util/inline_slice.h | 181 ++
be/src/kudu/util/int128-test.cc | 69 +
be/src/kudu/util/int128.h | 46 +
be/src/kudu/util/int128_util.h | 39 +
be/src/kudu/util/interval_tree-inl.h | 444 ++++
be/src/kudu/util/interval_tree-test.cc | 353 +++
be/src/kudu/util/interval_tree.h | 158 ++
be/src/kudu/util/jsonreader-test.cc | 193 ++
be/src/kudu/util/jsonreader.cc | 141 +
be/src/kudu/util/jsonreader.h | 92 +
be/src/kudu/util/jsonwriter-test.cc | 216 ++
be/src/kudu/util/jsonwriter.cc | 352 +++
be/src/kudu/util/jsonwriter.h | 102 +
be/src/kudu/util/jsonwriter_test.proto | 79 +
be/src/kudu/util/kernel_stack_watchdog.cc | 256 ++
be/src/kudu/util/kernel_stack_watchdog.h | 290 +++
be/src/kudu/util/knapsack_solver-test.cc | 172 ++
be/src/kudu/util/knapsack_solver.h | 269 ++
be/src/kudu/util/locks.cc | 47 +
be/src/kudu/util/locks.h | 294 +++
be/src/kudu/util/logging-test.cc | 249 ++
be/src/kudu/util/logging.cc | 413 +++
be/src/kudu/util/logging.h | 367 +++
be/src/kudu/util/logging_callback.h | 46 +
be/src/kudu/util/logging_test_util.h | 60 +
be/src/kudu/util/maintenance_manager-test.cc | 369 +++
be/src/kudu/util/maintenance_manager.cc | 550 ++++
be/src/kudu/util/maintenance_manager.h | 361 +++
be/src/kudu/util/maintenance_manager.proto | 54 +
be/src/kudu/util/make_shared.h | 64 +
be/src/kudu/util/malloc.cc | 35 +
be/src/kudu/util/malloc.h | 32 +
be/src/kudu/util/map-util-test.cc | 116 +
be/src/kudu/util/mem_tracker-test.cc | 285 ++
be/src/kudu/util/mem_tracker.cc | 296 +++
be/src/kudu/util/mem_tracker.h | 272 ++
be/src/kudu/util/memcmpable_varint-test.cc | 220 ++
be/src/kudu/util/memcmpable_varint.cc | 257 ++
be/src/kudu/util/memcmpable_varint.h | 45 +
be/src/kudu/util/memory/arena-test.cc | 205 ++
be/src/kudu/util/memory/arena.cc | 167 ++
be/src/kudu/util/memory/arena.h | 501 ++++
be/src/kudu/util/memory/memory.cc | 339 +++
be/src/kudu/util/memory/memory.h | 970 +++++++
be/src/kudu/util/memory/overwrite.cc | 42 +
be/src/kudu/util/memory/overwrite.h | 33 +
be/src/kudu/util/metrics-test.cc | 388 +++
be/src/kudu/util/metrics.cc | 746 ++++++
be/src/kudu/util/metrics.h | 1195 +++++++++
be/src/kudu/util/minidump-test.cc | 149 ++
be/src/kudu/util/minidump.cc | 382 +++
be/src/kudu/util/minidump.h | 104 +
be/src/kudu/util/monotime-test.cc | 424 +++
be/src/kudu/util/monotime.cc | 334 +++
be/src/kudu/util/monotime.h | 421 +++
be/src/kudu/util/mt-hdr_histogram-test.cc | 116 +
be/src/kudu/util/mt-metrics-test.cc | 128 +
be/src/kudu/util/mt-threadlocal-test.cc | 357 +++
be/src/kudu/util/mutex.cc | 164 ++
be/src/kudu/util/mutex.h | 142 +
be/src/kudu/util/net/dns_resolver-test.cc | 59 +
be/src/kudu/util/net/dns_resolver.cc | 65 +
be/src/kudu/util/net/dns_resolver.h | 62 +
be/src/kudu/util/net/net_util-test.cc | 170 ++
be/src/kudu/util/net/net_util.cc | 402 +++
be/src/kudu/util/net/net_util.h | 166 ++
be/src/kudu/util/net/sockaddr.cc | 136 +
be/src/kudu/util/net/sockaddr.h | 94 +
be/src/kudu/util/net/socket-test.cc | 89 +
be/src/kudu/util/net/socket.cc | 590 +++++
be/src/kudu/util/net/socket.h | 178 ++
be/src/kudu/util/nvm_cache.cc | 577 +++++
be/src/kudu/util/nvm_cache.h | 31 +
be/src/kudu/util/object_pool-test.cc | 86 +
be/src/kudu/util/object_pool.h | 166 ++
be/src/kudu/util/oid_generator-test.cc | 52 +
be/src/kudu/util/oid_generator.cc | 65 +
be/src/kudu/util/oid_generator.h | 63 +
be/src/kudu/util/once-test.cc | 113 +
be/src/kudu/util/once.cc | 32 +
be/src/kudu/util/once.h | 116 +
be/src/kudu/util/os-util-test.cc | 62 +
be/src/kudu/util/os-util.cc | 185 ++
be/src/kudu/util/os-util.h | 72 +
be/src/kudu/util/path_util-test.cc | 77 +
be/src/kudu/util/path_util.cc | 122 +
be/src/kudu/util/path_util.h | 63 +
be/src/kudu/util/pb_util-internal.cc | 105 +
be/src/kudu/util/pb_util-internal.h | 136 +
be/src/kudu/util/pb_util-test.cc | 661 +++++
be/src/kudu/util/pb_util.cc | 1088 ++++++++
be/src/kudu/util/pb_util.h | 513 ++++
be/src/kudu/util/pb_util.proto | 45 +
be/src/kudu/util/pb_util_test.proto | 29 +
be/src/kudu/util/process_memory-test.cc | 75 +
be/src/kudu/util/process_memory.cc | 287 +++
be/src/kudu/util/process_memory.h | 62 +
be/src/kudu/util/promise.h | 79 +
be/src/kudu/util/proto_container_test.proto | 25 +
be/src/kudu/util/proto_container_test2.proto | 29 +
be/src/kudu/util/proto_container_test3.proto | 33 +
be/src/kudu/util/protobuf-annotations.h | 33 +
be/src/kudu/util/protobuf_util.h | 39 +
be/src/kudu/util/protoc-gen-insertions.cc | 77 +
be/src/kudu/util/pstack_watcher-test.cc | 100 +
be/src/kudu/util/pstack_watcher.cc | 249 ++
be/src/kudu/util/pstack_watcher.h | 101 +
be/src/kudu/util/random-test.cc | 171 ++
be/src/kudu/util/random.h | 252 ++
be/src/kudu/util/random_util-test.cc | 75 +
be/src/kudu/util/random_util.cc | 65 +
be/src/kudu/util/random_util.h | 44 +
be/src/kudu/util/rle-encoding.h | 523 ++++
be/src/kudu/util/rle-test.cc | 546 ++++
be/src/kudu/util/rolling_log-test.cc | 147 ++
be/src/kudu/util/rolling_log.cc | 285 ++
be/src/kudu/util/rolling_log.h | 128 +
be/src/kudu/util/rw_mutex-test.cc | 185 ++
be/src/kudu/util/rw_mutex.cc | 207 ++
be/src/kudu/util/rw_mutex.h | 123 +
be/src/kudu/util/rw_semaphore-test.cc | 94 +
be/src/kudu/util/rw_semaphore.h | 206 ++
be/src/kudu/util/rwc_lock-test.cc | 147 ++
be/src/kudu/util/rwc_lock.cc | 136 +
be/src/kudu/util/rwc_lock.h | 142 +
be/src/kudu/util/safe_math-test.cc | 56 +
be/src/kudu/util/safe_math.h | 69 +
be/src/kudu/util/scoped_cleanup-test.cc | 56 +
be/src/kudu/util/scoped_cleanup.h | 67 +
be/src/kudu/util/semaphore.cc | 105 +
be/src/kudu/util/semaphore.h | 77 +
be/src/kudu/util/semaphore_macosx.cc | 75 +
be/src/kudu/util/signal.cc | 47 +
be/src/kudu/util/signal.h | 42 +
be/src/kudu/util/slice-test.cc | 61 +
be/src/kudu/util/slice.cc | 97 +
be/src/kudu/util/slice.h | 332 +++
.../util/sorted_disjoint_interval_list-test.cc | 98 +
.../kudu/util/sorted_disjoint_interval_list.h | 95 +
be/src/kudu/util/spinlock_profiling-test.cc | 81 +
be/src/kudu/util/spinlock_profiling.cc | 308 +++
be/src/kudu/util/spinlock_profiling.h | 72 +
be/src/kudu/util/stack_watchdog-test.cc | 152 ++
be/src/kudu/util/status-test.cc | 119 +
be/src/kudu/util/status.cc | 170 ++
be/src/kudu/util/status.h | 493 ++++
be/src/kudu/util/status_callback.cc | 41 +
be/src/kudu/util/status_callback.h | 54 +
be/src/kudu/util/stopwatch.h | 364 +++
be/src/kudu/util/string_case-test.cc | 65 +
be/src/kudu/util/string_case.cc | 76 +
be/src/kudu/util/string_case.h | 48 +
be/src/kudu/util/striped64-test.cc | 163 ++
be/src/kudu/util/striped64.cc | 191 ++
be/src/kudu/util/striped64.h | 168 ++
be/src/kudu/util/subprocess-test.cc | 381 +++
be/src/kudu/util/subprocess.cc | 815 ++++++
be/src/kudu/util/subprocess.h | 219 ++
be/src/kudu/util/test_graph.cc | 121 +
be/src/kudu/util/test_graph.h | 90 +
be/src/kudu/util/test_macros.h | 123 +
be/src/kudu/util/test_main.cc | 109 +
be/src/kudu/util/test_util.cc | 446 ++++
be/src/kudu/util/test_util.h | 146 ++
be/src/kudu/util/test_util_prod.cc | 28 +
be/src/kudu/util/test_util_prod.h | 32 +
be/src/kudu/util/thread-test.cc | 160 ++
be/src/kudu/util/thread.cc | 628 +++++
be/src/kudu/util/thread.h | 373 +++
be/src/kudu/util/thread_restrictions.cc | 87 +
be/src/kudu/util/thread_restrictions.h | 121 +
be/src/kudu/util/threadlocal.cc | 89 +
be/src/kudu/util/threadlocal.h | 128 +
be/src/kudu/util/threadlocal_cache.h | 110 +
be/src/kudu/util/threadpool-test.cc | 941 +++++++
be/src/kudu/util/threadpool.cc | 766 ++++++
be/src/kudu/util/threadpool.h | 505 ++++
be/src/kudu/util/throttler-test.cc | 76 +
be/src/kudu/util/throttler.cc | 67 +
be/src/kudu/util/throttler.h | 62 +
be/src/kudu/util/trace-test.cc | 891 +++++++
be/src/kudu/util/trace.cc | 259 ++
be/src/kudu/util/trace.h | 292 +++
be/src/kudu/util/trace_metrics.cc | 74 +
be/src/kudu/util/trace_metrics.h | 89 +
be/src/kudu/util/url-coding-test.cc | 112 +
be/src/kudu/util/url-coding.cc | 208 ++
be/src/kudu/util/url-coding.h | 69 +
be/src/kudu/util/user-test.cc | 44 +
be/src/kudu/util/user.cc | 90 +
be/src/kudu/util/user.h | 32 +
be/src/kudu/util/version_info.cc | 84 +
be/src/kudu/util/version_info.h | 51 +
be/src/kudu/util/version_info.proto | 32 +
be/src/kudu/util/version_util-test.cc | 66 +
be/src/kudu/util/version_util.cc | 83 +
be/src/kudu/util/version_util.h | 58 +
be/src/kudu/util/web_callback_registry.h | 129 +
be/src/kudu/util/website_util.cc | 43 +
be/src/kudu/util/website_util.h | 35 +
be/src/kudu/util/zlib.cc | 127 +
be/src/kudu/util/zlib.h | 39 +
450 files changed, 99139 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/CMakeLists.txt b/be/src/kudu/rpc/CMakeLists.txt
new file mode 100644
index 0000000..f8cdb02
--- /dev/null
+++ b/be/src/kudu/rpc/CMakeLists.txt
@@ -0,0 +1,138 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#### Global header protobufs
+PROTOBUF_GENERATE_CPP(
+ RPC_HEADER_PROTO_SRCS RPC_HEADER_PROTO_HDRS RPC_HEADER_PROTO_TGTS
+ SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+ BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+ PROTO_FILES rpc_header.proto)
+ADD_EXPORTABLE_LIBRARY(rpc_header_proto
+ SRCS ${RPC_HEADER_PROTO_SRCS}
+ DEPS protobuf pb_util_proto token_proto
+ NONLINK_DEPS ${RPC_HEADER_PROTO_TGTS})
+
+PROTOBUF_GENERATE_CPP(
+ RPC_INTROSPECTION_PROTO_SRCS RPC_INTROSPECTION_PROTO_HDRS RPC_INTROSPECTION_PROTO_TGTS
+ SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+ BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+ PROTO_FILES rpc_introspection.proto)
+set(RPC_INTROSPECTION_PROTO_LIBS
+ rpc_header_proto
+ protobuf)
+ADD_EXPORTABLE_LIBRARY(rpc_introspection_proto
+ SRCS ${RPC_INTROSPECTION_PROTO_SRCS}
+ DEPS ${RPC_INTROSPECTION_PROTO_LIBS}
+ NONLINK_DEPS ${RPC_INTROSPECTION_PROTO_TGTS})
+
+### RPC library
+set(KRPC_SRCS
+ acceptor_pool.cc
+ blocking_ops.cc
+ client_negotiation.cc
+ connection.cc
+ connection_id.cc
+ constants.cc
+ inbound_call.cc
+ messenger.cc
+ negotiation.cc
+ outbound_call.cc
+ periodic.cc
+ proxy.cc
+ reactor.cc
+ remote_method.cc
+ remote_user.cc
+ request_tracker.cc
+ result_tracker.cc
+ rpc.cc
+ rpc_context.cc
+ rpc_controller.cc
+ rpc_sidecar.cc
+ rpcz_store.cc
+ sasl_common.cc
+ sasl_helper.cc
+ serialization.cc
+ server_negotiation.cc
+ service_if.cc
+ service_pool.cc
+ service_queue.cc
+ user_credentials.cc
+ transfer.cc
+)
+
+set(KRPC_LIBS
+ cyrus_sasl
+ gssapi_krb5
+ gutil
+ kudu_util
+ libev
+ rpc_header_proto
+ rpc_introspection_proto
+ security)
+
+ADD_EXPORTABLE_LIBRARY(krpc
+ SRCS ${KRPC_SRCS}
+ DEPS ${KRPC_LIBS})
+
+### RPC generator tool
+add_executable(protoc-gen-krpc protoc-gen-krpc.cc)
+target_link_libraries(protoc-gen-krpc
+ ${KUDU_BASE_LIBS}
+ rpc_header_proto
+ protoc
+ protobuf
+ gutil
+ kudu_util)
+
+#### RPC test
+PROTOBUF_GENERATE_CPP(
+ RPC_TEST_DIFF_PACKAGE_SRCS RPC_TEST_DIFF_PACKAGE_HDRS RPC_TEST_DIFF_PACKAGE_TGTS
+ SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+ BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+ PROTO_FILES rtest_diff_package.proto)
+add_library(rtest_diff_package_proto ${RPC_TEST_DIFF_PACKAGE_SRCS} ${RPC_TEST_DIFF_PACKAGE_HDRS})
+target_link_libraries(rtest_diff_package_proto rpc_header_proto)
+
+KRPC_GENERATE(
+ RTEST_KRPC_SRCS RTEST_KRPC_HDRS RTEST_KRPC_TGTS
+ SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+ BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+ PROTO_FILES rtest.proto)
+add_library(rtest_krpc ${RTEST_KRPC_SRCS} ${RTEST_KRPC_HDRS})
+target_link_libraries(rtest_krpc
+ krpc
+ rpc_header_proto
+ rtest_diff_package_proto)
+
+# Tests
+set(KUDU_TEST_LINK_LIBS
+ krpc
+ mini_kdc
+ rpc_header_proto
+ rtest_krpc
+ security_test_util
+ ${KUDU_MIN_TEST_LIBS})
+ADD_KUDU_TEST(exactly_once_rpc-test PROCESSORS 10)
+ADD_KUDU_TEST(mt-rpc-test RUN_SERIAL true)
+ADD_KUDU_TEST(negotiation-test)
+ADD_KUDU_TEST(periodic-test)
+ADD_KUDU_TEST(reactor-test)
+ADD_KUDU_TEST(request_tracker-test)
+ADD_KUDU_TEST(rpc-bench RUN_SERIAL true)
+ADD_KUDU_TEST(rpc-test)
+ADD_KUDU_TEST(rpc_stub-test)
+ADD_KUDU_TEST(service_queue-test RUN_SERIAL true)
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/acceptor_pool.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/acceptor_pool.cc b/be/src/kudu/rpc/acceptor_pool.cc
new file mode 100644
index 0000000..e4bcbd1
--- /dev/null
+++ b/be/src/kudu/rpc/acceptor_pool.cc
@@ -0,0 +1,175 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/acceptor_pool.h"
+
+#include <string>
+#include <ostream>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+
+namespace google {
+namespace protobuf {
+
+class Message;
+
+}
+}
+
+using google::protobuf::Message;
+using std::string;
+
+METRIC_DEFINE_counter(server, rpc_connections_accepted,
+ "RPC Connections Accepted",
+ kudu::MetricUnit::kConnections,
+ "Number of incoming TCP connections made to the RPC server");
+
+DEFINE_int32(rpc_acceptor_listen_backlog, 128,
+ "Socket backlog parameter used when listening for RPC connections. "
+ "This defines the maximum length to which the queue of pending "
+ "TCP connections inbound to the RPC server may grow. If a connection "
+ "request arrives when the queue is full, the client may receive "
+ "an error. Higher values may help the server ride over bursts of "
+ "new inbound connection requests.");
+TAG_FLAG(rpc_acceptor_listen_backlog, advanced);
+
+namespace kudu {
+namespace rpc {
+
+AcceptorPool::AcceptorPool(Messenger* messenger, Socket* socket,
+ Sockaddr bind_address)
+ : messenger_(messenger),
+ socket_(socket->Release()),
+ bind_address_(bind_address),
+ rpc_connections_accepted_(METRIC_rpc_connections_accepted.Instantiate(
+ messenger->metric_entity())),
+ closing_(false) {}
+
+AcceptorPool::~AcceptorPool() {
+ Shutdown();
+}
+
+Status AcceptorPool::Start(int num_threads) {
+ RETURN_NOT_OK(socket_.Listen(FLAGS_rpc_acceptor_listen_backlog));
+
+ for (int i = 0; i < num_threads; i++) {
+ scoped_refptr<kudu::Thread> new_thread;
+ Status s = kudu::Thread::Create("acceptor pool", "acceptor",
+ &AcceptorPool::RunThread, this, &new_thread);
+ if (!s.ok()) {
+ Shutdown();
+ return s;
+ }
+ threads_.push_back(new_thread);
+ }
+ return Status::OK();
+}
+
+void AcceptorPool::Shutdown() {
+ if (Acquire_CompareAndSwap(&closing_, false, true) != false) {
+ VLOG(2) << "Acceptor Pool on " << bind_address_.ToString()
+ << " already shut down";
+ return;
+ }
+
+#if defined(__linux__)
+ // Closing the socket will break us out of accept() if we're in it, and
+ // prevent future accepts.
+ WARN_NOT_OK(socket_.Shutdown(true, true),
+ strings::Substitute("Could not shut down acceptor socket on $0",
+ bind_address_.ToString()));
+#else
+ // Calling shutdown on an accepting (non-connected) socket is illegal on most
+ // platforms (but not Linux). Instead, the accepting threads are interrupted
+ // forcefully.
+ for (const scoped_refptr<kudu::Thread>& thread : threads_) {
+ pthread_cancel(thread.get()->pthread_id());
+ }
+#endif
+
+ for (const scoped_refptr<kudu::Thread>& thread : threads_) {
+ CHECK_OK(ThreadJoiner(thread.get()).Join());
+ }
+ threads_.clear();
+
+ // Close the socket: keeping the descriptor open and, possibly, receiving late
+ // not-to-be-read messages from the peer does not make much sense. The
+ // Socket::Close() method is called upon destruction of the aggregated socket_
+ // object as well. However, the typical ownership pattern of an AcceptorPool
+ // object includes two references wrapped via a shared_ptr smart pointer: one
+ // is held by Messenger, another by RpcServer. If not calling Socket::Close()
+ // here, it would necessary to wait until Messenger::Shutdown() is called for
+ // the corresponding messenger object to close this socket.
+ ignore_result(socket_.Close());
+}
+
+Sockaddr AcceptorPool::bind_address() const {
+ return bind_address_;
+}
+
+Status AcceptorPool::GetBoundAddress(Sockaddr* addr) const {
+ return socket_.GetSocketAddress(addr);
+}
+
+int64_t AcceptorPool::num_rpc_connections_accepted() const {
+ return rpc_connections_accepted_->value();
+}
+
+void AcceptorPool::RunThread() {
+ while (true) {
+ Socket new_sock;
+ Sockaddr remote;
+ VLOG(2) << "calling accept() on socket " << socket_.GetFd()
+ << " listening on " << bind_address_.ToString();
+ Status s = socket_.Accept(&new_sock, &remote, Socket::FLAG_NONBLOCKING);
+ if (!s.ok()) {
+ if (Release_Load(&closing_)) {
+ break;
+ }
+ KLOG_EVERY_N_SECS(WARNING, 1) << "AcceptorPool: accept failed: " << s.ToString()
+ << THROTTLE_MSG;
+ continue;
+ }
+ s = new_sock.SetNoDelay(true);
+ if (!s.ok()) {
+ KLOG_EVERY_N_SECS(WARNING, 1) << "Acceptor with remote = " << remote.ToString()
+ << " failed to set TCP_NODELAY on a newly accepted socket: "
+ << s.ToString() << THROTTLE_MSG;
+ continue;
+ }
+ rpc_connections_accepted_->Increment();
+ messenger_->RegisterInboundSocket(&new_sock, remote);
+ }
+ VLOG(1) << "AcceptorPool shutting down.";
+}
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/acceptor_pool.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/acceptor_pool.h b/be/src/kudu/rpc/acceptor_pool.h
new file mode 100644
index 0000000..ba1996a
--- /dev/null
+++ b/be/src/kudu/rpc/acceptor_pool.h
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_RPC_ACCEPTOR_POOL_H
+#define KUDU_RPC_ACCEPTOR_POOL_H
+
+#include <stdint.h>
+#include <vector>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Counter;
+class Thread;
+
+namespace rpc {
+
+class Messenger;
+
+// A pool of threads calling accept() to create new connections.
+// Acceptor pool threads terminate when they notice that the messenger has been
+// shut down, if Shutdown() is called, or if the pool object is destructed.
+class AcceptorPool {
+ public:
+ // Create a new acceptor pool. Calls socket::Release to take ownership of the
+ // socket.
+ // 'socket' must be already bound, but should not yet be listening.
+ AcceptorPool(Messenger *messenger, Socket *socket, Sockaddr bind_address);
+ ~AcceptorPool();
+
+ // Start listening and accepting connections.
+ Status Start(int num_threads);
+ void Shutdown();
+
+ // Return the address that the pool is bound to. If the port is specified as
+ // 0, then this will always return port 0.
+ Sockaddr bind_address() const;
+
+ // Return the address that the pool is bound to. This only works while the
+ // socket is open, and if the specified port is 0 then this will return the
+ // actual port that was bound.
+ Status GetBoundAddress(Sockaddr* addr) const;
+
+ // Return the number of connections accepted by this messenger. Thread-safe.
+ int64_t num_rpc_connections_accepted() const;
+
+ private:
+ void RunThread();
+
+ Messenger *messenger_;
+ Socket socket_;
+ Sockaddr bind_address_;
+ std::vector<scoped_refptr<kudu::Thread> > threads_;
+
+ scoped_refptr<Counter> rpc_connections_accepted_;
+
+ Atomic32 closing_;
+
+ DISALLOW_COPY_AND_ASSIGN(AcceptorPool);
+};
+
+} // namespace rpc
+} // namespace kudu
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/blocking_ops.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/blocking_ops.cc b/be/src/kudu/rpc/blocking_ops.cc
new file mode 100644
index 0000000..f5cd644
--- /dev/null
+++ b/be/src/kudu/rpc/blocking_ops.cc
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/blocking_ops.h"
+
+#include <cstdint>
+#include <cstring>
+#include <ostream>
+
+#include <glog/logging.h>
+#include <google/protobuf/message_lite.h>
+
+#include "kudu/gutil/endian.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/constants.h"
+#include "kudu/rpc/serialization.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace rpc {
+
+using google::protobuf::MessageLite;
+
+const char kHTTPHeader[] = "HTTP";
+
+Status CheckInBlockingMode(const Socket* sock) {
+ bool is_nonblocking;
+ RETURN_NOT_OK(sock->IsNonBlocking(&is_nonblocking));
+ if (is_nonblocking) {
+ static const char* const kErrMsg = "socket is not in blocking mode";
+ LOG(DFATAL) << kErrMsg;
+ return Status::IllegalState(kErrMsg);
+ }
+ return Status::OK();
+}
+
+Status SendFramedMessageBlocking(Socket* sock, const MessageLite& header, const MessageLite& msg,
+ const MonoTime& deadline) {
+ DCHECK(sock != nullptr);
+ DCHECK(header.IsInitialized()) << "header protobuf must be initialized";
+ DCHECK(msg.IsInitialized()) << "msg protobuf must be initialized";
+
+ // Ensure we are in blocking mode.
+ // These blocking calls are typically not in the fast path, so doing this for all build types.
+ RETURN_NOT_OK(CheckInBlockingMode(sock));
+
+ // Serialize message
+ faststring param_buf;
+ serialization::SerializeMessage(msg, ¶m_buf);
+
+ // Serialize header and initial length
+ faststring header_buf;
+ serialization::SerializeHeader(header, param_buf.size(), &header_buf);
+
+ // Write header & param to stream
+ size_t nsent;
+ RETURN_NOT_OK(sock->BlockingWrite(header_buf.data(), header_buf.size(), &nsent, deadline));
+ RETURN_NOT_OK(sock->BlockingWrite(param_buf.data(), param_buf.size(), &nsent, deadline));
+
+ return Status::OK();
+}
+
+Status ReceiveFramedMessageBlocking(Socket* sock, faststring* recv_buf,
+ MessageLite* header, Slice* param_buf, const MonoTime& deadline) {
+ DCHECK(sock != nullptr);
+ DCHECK(recv_buf != nullptr);
+ DCHECK(header != nullptr);
+ DCHECK(param_buf != nullptr);
+
+ RETURN_NOT_OK(CheckInBlockingMode(sock));
+
+ // Read the message prefix, which specifies the length of the payload.
+ recv_buf->clear();
+ recv_buf->resize(kMsgLengthPrefixLength);
+ size_t recvd = 0;
+ RETURN_NOT_OK(sock->BlockingRecv(recv_buf->data(), kMsgLengthPrefixLength, &recvd, deadline));
+ uint32_t payload_len = NetworkByteOrder::Load32(recv_buf->data());
+
+ // Verify that the payload size isn't out of bounds.
+ // This can happen because of network corruption, or a naughty client.
+ if (PREDICT_FALSE(payload_len > FLAGS_rpc_max_message_size)) {
+ // A common user mistake is to try to speak the Kudu RPC protocol to an
+ // HTTP endpoint, or vice versa.
+ if (memcmp(recv_buf->data(), kHTTPHeader, strlen(kHTTPHeader)) == 0) {
+ return Status::IOError(
+ "received invalid RPC message which appears to be an HTTP response. "
+ "Verify that you have specified a valid RPC port and not an HTTP port.");
+ }
+
+ return Status::IOError(
+ strings::Substitute(
+ "received invalid message of size $0 which exceeds"
+ " the rpc_max_message_size of $1 bytes",
+ payload_len, FLAGS_rpc_max_message_size));
+ }
+
+ // Read the message payload.
+ recvd = 0;
+ recv_buf->resize(payload_len + kMsgLengthPrefixLength);
+ RETURN_NOT_OK(sock->BlockingRecv(recv_buf->data() + kMsgLengthPrefixLength,
+ payload_len, &recvd, deadline));
+ RETURN_NOT_OK(serialization::ParseMessage(Slice(*recv_buf), header, param_buf));
+ return Status::OK();
+}
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/blocking_ops.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/blocking_ops.h b/be/src/kudu/rpc/blocking_ops.h
new file mode 100644
index 0000000..b305ba7
--- /dev/null
+++ b/be/src/kudu/rpc/blocking_ops.h
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_RPC_BLOCKING_OPS_H
+#define KUDU_RPC_BLOCKING_OPS_H
+
+namespace google {
+namespace protobuf {
+class MessageLite;
+} // namespace protobuf
+} // namespace google
+
+namespace kudu {
+
+class faststring;
+class MonoTime;
+class Slice;
+class Socket;
+class Status;
+
+namespace rpc {
+
+// Returns OK if socket is in blocking mode. Otherwise, returns an error.
+Status CheckInBlockingMode(const Socket* sock);
+
+// Encode and send a message over a socket.
+// header: Request or Response header protobuf.
+// msg: Protobuf message to send. This message must be fully initialized.
+// deadline: Latest time allowed for receive to complete before timeout.
+Status SendFramedMessageBlocking(Socket* sock, const google::protobuf::MessageLite& header,
+ const google::protobuf::MessageLite& msg, const MonoTime& deadline);
+
+// Receive a full message frame from the server.
+// recv_buf: buffer to use for reading the data from the socket.
+// header: Request or Response header protobuf.
+// param_buf: Slice into recv_buf containing unparsed RPC param protobuf data.
+// deadline: Latest time allowed for receive to complete before timeout.
+Status ReceiveFramedMessageBlocking(Socket* sock, faststring* recv_buf,
+ google::protobuf::MessageLite* header, Slice* param_buf, const MonoTime& deadline);
+
+} // namespace rpc
+} // namespace kudu
+
+#endif // KUDU_RPC_BLOCKING_OPS_H
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/client_negotiation.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/client_negotiation.cc b/be/src/kudu/rpc/client_negotiation.cc
new file mode 100644
index 0000000..02175f6
--- /dev/null
+++ b/be/src/kudu/rpc/client_negotiation.cc
@@ -0,0 +1,853 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/client_negotiation.h"
+
+#include <cstdint>
+#include <cstring>
+#include <map>
+#include <memory>
+#include <ostream>
+#include <set>
+#include <string>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gssapi/gssapi.h>
+#include <gssapi/gssapi_krb5.h>
+#include <sasl/sasl.h>
+
+#include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/blocking_ops.h"
+#include "kudu/rpc/constants.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/sasl_common.h"
+#include "kudu/rpc/sasl_helper.h"
+#include "kudu/rpc/serialization.h"
+#include "kudu/security/cert.h"
+#include "kudu/security/tls_context.h"
+#include "kudu/security/tls_handshake.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/trace.h"
+
+using std::map;
+using std::set;
+using std::string;
+using std::unique_ptr;
+
+using strings::Substitute;
+
+DECLARE_bool(rpc_encrypt_loopback_connections);
+
+namespace kudu {
+namespace rpc {
+
+static int ClientNegotiationGetoptCb(ClientNegotiation* client_negotiation,
+ const char* plugin_name,
+ const char* option,
+ const char** result,
+ unsigned* len) {
+ return client_negotiation->GetOptionCb(plugin_name, option, result, len);
+}
+
+static int ClientNegotiationSimpleCb(ClientNegotiation* client_negotiation,
+ int id,
+ const char** result,
+ unsigned* len) {
+ return client_negotiation->SimpleCb(id, result, len);
+}
+
+static int ClientNegotiationSecretCb(sasl_conn_t* conn,
+ ClientNegotiation* client_negotiation,
+ int id,
+ sasl_secret_t** psecret) {
+ return client_negotiation->SecretCb(conn, id, psecret);
+}
+
+// Return an appropriately-typed Status object based on an ErrorStatusPB returned
+// from an Error RPC.
+// In case there is no relevant Status type, return a RuntimeError.
+static Status StatusFromRpcError(const ErrorStatusPB& error) {
+ DCHECK(error.IsInitialized()) << "Error status PB must be initialized";
+ if (PREDICT_FALSE(!error.has_code())) {
+ return Status::RuntimeError(error.message());
+ }
+ const string code_name = ErrorStatusPB::RpcErrorCodePB_Name(error.code());
+ switch (error.code()) {
+ case ErrorStatusPB_RpcErrorCodePB_FATAL_UNAUTHORIZED: // fall-through
+ case ErrorStatusPB_RpcErrorCodePB_FATAL_INVALID_AUTHENTICATION_TOKEN:
+ return Status::NotAuthorized(code_name, error.message());
+ case ErrorStatusPB_RpcErrorCodePB_ERROR_UNAVAILABLE:
+ return Status::ServiceUnavailable(code_name, error.message());
+ default:
+ return Status::RuntimeError(code_name, error.message());
+ }
+}
+
+ClientNegotiation::ClientNegotiation(unique_ptr<Socket> socket,
+ const security::TlsContext* tls_context,
+ boost::optional<security::SignedTokenPB> authn_token,
+ RpcEncryption encryption,
+ std::string sasl_proto_name)
+ : socket_(std::move(socket)),
+ helper_(SaslHelper::CLIENT),
+ tls_context_(tls_context),
+ encryption_(encryption),
+ tls_negotiated_(false),
+ authn_token_(std::move(authn_token)),
+ psecret_(nullptr, std::free),
+ negotiated_authn_(AuthenticationType::INVALID),
+ negotiated_mech_(SaslMechanism::INVALID),
+ sasl_proto_name_(std::move(sasl_proto_name)),
+ deadline_(MonoTime::Max()) {
+ callbacks_.push_back(SaslBuildCallback(SASL_CB_GETOPT,
+ reinterpret_cast<int (*)()>(&ClientNegotiationGetoptCb), this));
+ callbacks_.push_back(SaslBuildCallback(SASL_CB_AUTHNAME,
+ reinterpret_cast<int (*)()>(&ClientNegotiationSimpleCb), this));
+ callbacks_.push_back(SaslBuildCallback(SASL_CB_PASS,
+ reinterpret_cast<int (*)()>(&ClientNegotiationSecretCb), this));
+ callbacks_.push_back(SaslBuildCallback(SASL_CB_LIST_END, nullptr, nullptr));
+ DCHECK(socket_);
+ DCHECK(tls_context_);
+}
+
+Status ClientNegotiation::EnablePlain(const string& user, const string& pass) {
+ RETURN_NOT_OK(helper_.EnablePlain());
+ plain_auth_user_ = user;
+ plain_pass_ = pass;
+ return Status::OK();
+}
+
+Status ClientNegotiation::EnableGSSAPI() {
+ return helper_.EnableGSSAPI();
+}
+
+SaslMechanism::Type ClientNegotiation::negotiated_mechanism() const {
+ return negotiated_mech_;
+}
+
+void ClientNegotiation::set_server_fqdn(const string& domain_name) {
+ helper_.set_server_fqdn(domain_name);
+}
+
+void ClientNegotiation::set_deadline(const MonoTime& deadline) {
+ deadline_ = deadline;
+}
+
+Status ClientNegotiation::Negotiate(unique_ptr<ErrorStatusPB>* rpc_error) {
+ TRACE("Beginning negotiation");
+
+ // Ensure we can use blocking calls on the socket during negotiation.
+ RETURN_NOT_OK(CheckInBlockingMode(socket_.get()));
+
+ // Step 1: send the connection header.
+ RETURN_NOT_OK(SendConnectionHeader());
+
+ faststring recv_buf;
+
+ { // Step 2: send and receive the NEGOTIATE step messages.
+ RETURN_NOT_OK(SendNegotiate());
+ NegotiatePB response;
+ RETURN_NOT_OK(RecvNegotiatePB(&response, &recv_buf, rpc_error));
+ RETURN_NOT_OK(HandleNegotiate(response));
+ TRACE("Negotiated authn=$0", AuthenticationTypeToString(negotiated_authn_));
+ }
+
+ // Step 3: if both ends support TLS, do a TLS handshake.
+ // TODO(KUDU-1921): allow the client to require TLS.
+ if (encryption_ != RpcEncryption::DISABLED &&
+ ContainsKey(server_features_, TLS)) {
+ RETURN_NOT_OK(tls_context_->InitiateHandshake(security::TlsHandshakeType::CLIENT,
+ &tls_handshake_));
+
+ if (negotiated_authn_ == AuthenticationType::SASL) {
+ // When using SASL authentication, verifying the server's certificate is
+ // not necessary. This allows the client to still use TLS encryption for
+ // connections to servers which only have a self-signed certificate.
+ tls_handshake_.set_verification_mode(security::TlsVerificationMode::VERIFY_NONE);
+ }
+
+ // To initiate the TLS handshake, we pretend as if the server sent us an
+ // empty TLS_HANDSHAKE token.
+ NegotiatePB initial;
+ initial.set_step(NegotiatePB::TLS_HANDSHAKE);
+ initial.set_tls_handshake("");
+ Status s = HandleTlsHandshake(initial);
+
+ while (s.IsIncomplete()) {
+ NegotiatePB response;
+ RETURN_NOT_OK(RecvNegotiatePB(&response, &recv_buf, rpc_error));
+ s = HandleTlsHandshake(response);
+ }
+ RETURN_NOT_OK(s);
+ tls_negotiated_ = true;
+ }
+
+ // Step 4: Authentication
+ switch (negotiated_authn_) {
+ case AuthenticationType::SASL:
+ RETURN_NOT_OK(AuthenticateBySasl(&recv_buf, rpc_error));
+ break;
+ case AuthenticationType::TOKEN:
+ RETURN_NOT_OK(AuthenticateByToken(&recv_buf, rpc_error));
+ break;
+ case AuthenticationType::CERTIFICATE:
+ // The TLS handshake has already authenticated the server.
+ break;
+ case AuthenticationType::INVALID: LOG(FATAL) << "unreachable";
+ }
+
+ // Step 5: Send connection context.
+ RETURN_NOT_OK(SendConnectionContext());
+
+ TRACE("Negotiation successful");
+ return Status::OK();
+}
+
+Status ClientNegotiation::SendNegotiatePB(const NegotiatePB& msg) {
+ RequestHeader header;
+ header.set_call_id(kNegotiateCallId);
+
+ DCHECK(socket_);
+ DCHECK(msg.IsInitialized()) << "message must be initialized";
+ DCHECK(msg.has_step()) << "message must have a step";
+
+ TRACE("Sending $0 NegotiatePB request", NegotiatePB::NegotiateStep_Name(msg.step()));
+ return SendFramedMessageBlocking(socket(), header, msg, deadline_);
+}
+
+Status ClientNegotiation::RecvNegotiatePB(NegotiatePB* msg,
+ faststring* buffer,
+ unique_ptr<ErrorStatusPB>* rpc_error) {
+ ResponseHeader header;
+ Slice param_buf;
+ RETURN_NOT_OK(ReceiveFramedMessageBlocking(socket(), buffer, &header, ¶m_buf, deadline_));
+ RETURN_NOT_OK(helper_.CheckNegotiateCallId(header.call_id()));
+
+ if (header.is_error()) {
+ return ParseError(param_buf, rpc_error);
+ }
+
+ RETURN_NOT_OK(helper_.ParseNegotiatePB(param_buf, msg));
+ TRACE("Received $0 NegotiatePB response", NegotiatePB::NegotiateStep_Name(msg->step()));
+ return Status::OK();
+}
+
+Status ClientNegotiation::ParseError(const Slice& err_data,
+ unique_ptr<ErrorStatusPB>* rpc_error) {
+ unique_ptr<ErrorStatusPB> error(new ErrorStatusPB);
+ if (!error->ParseFromArray(err_data.data(), err_data.size())) {
+ return Status::IOError("invalid error response, missing fields",
+ error->InitializationErrorString());
+ }
+ Status s = StatusFromRpcError(*error);
+ TRACE("Received error response from server: $0", s.ToString());
+
+ if (rpc_error) {
+ rpc_error->swap(error);
+ }
+ return s;
+}
+
+Status ClientNegotiation::SendConnectionHeader() {
+ const uint8_t buflen = kMagicNumberLength + kHeaderFlagsLength;
+ uint8_t buf[buflen];
+ serialization::SerializeConnHeader(buf);
+ size_t nsent;
+ return socket()->BlockingWrite(buf, buflen, &nsent, deadline_);
+}
+
+Status ClientNegotiation::InitSaslClient() {
+ // TODO(KUDU-1922): consider setting SASL_SUCCESS_DATA
+ unsigned flags = 0;
+
+ sasl_conn_t* sasl_conn = nullptr;
+ RETURN_NOT_OK_PREPEND(WrapSaslCall(nullptr /* no conn */, [&]() {
+ return sasl_client_new(
+ sasl_proto_name_.c_str(), // Registered name of the service using SASL. Required.
+ helper_.server_fqdn(), // The fully qualified domain name of the remote server.
+ nullptr, // Local and remote IP address strings. (we don't use
+ nullptr, // any mechanisms which require this info.)
+ &callbacks_[0], // Connection-specific callbacks.
+ flags,
+ &sasl_conn);
+ }), Substitute("unable to create new SASL $0 client", sasl_proto_name_));
+ sasl_conn_.reset(sasl_conn);
+ return Status::OK();
+}
+
+Status ClientNegotiation::SendNegotiate() {
+ NegotiatePB msg;
+ msg.set_step(NegotiatePB::NEGOTIATE);
+
+ // Advertise our supported features.
+ client_features_ = kSupportedClientRpcFeatureFlags;
+
+ if (encryption_ != RpcEncryption::DISABLED) {
+ client_features_.insert(TLS);
+ // If the remote peer is local, then we allow using TLS for authentication
+ // without encryption or integrity.
+ if (socket_->IsLoopbackConnection() && !FLAGS_rpc_encrypt_loopback_connections) {
+ client_features_.insert(TLS_AUTHENTICATION_ONLY);
+ }
+ }
+
+ for (RpcFeatureFlag feature : client_features_) {
+ msg.add_supported_features(feature);
+ }
+
+ if (!helper_.EnabledMechs().empty()) {
+ msg.add_authn_types()->mutable_sasl();
+ }
+ if (tls_context_->has_signed_cert() && !tls_context_->is_external_cert()) {
+ // We only provide authenticated TLS if the certificates are generated
+ // by the internal CA.
+ msg.add_authn_types()->mutable_certificate();
+ }
+ if (authn_token_ && tls_context_->has_trusted_cert()) {
+ // TODO(KUDU-1924): check that the authn token is not expired. Can this be done
+ // reliably on clients?
+ msg.add_authn_types()->mutable_token();
+ }
+
+ if (PREDICT_FALSE(msg.authn_types().empty())) {
+ return Status::NotAuthorized("client is not configured with an authentication type");
+ }
+
+ RETURN_NOT_OK(SendNegotiatePB(msg));
+ return Status::OK();
+}
+
+Status ClientNegotiation::HandleNegotiate(const NegotiatePB& response) {
+ if (PREDICT_FALSE(response.step() != NegotiatePB::NEGOTIATE)) {
+ return Status::NotAuthorized("expected NEGOTIATE step",
+ NegotiatePB::NegotiateStep_Name(response.step()));
+ }
+ TRACE("Received NEGOTIATE response from server");
+
+ // Fill in the set of features supported by the server.
+ for (int flag : response.supported_features()) {
+ // We only add the features that our local build knows about.
+ RpcFeatureFlag feature_flag = RpcFeatureFlag_IsValid(flag) ?
+ static_cast<RpcFeatureFlag>(flag) : UNKNOWN;
+ if (feature_flag != UNKNOWN) {
+ server_features_.insert(feature_flag);
+ }
+ }
+
+ if (encryption_ == RpcEncryption::REQUIRED &&
+ !ContainsKey(server_features_, RpcFeatureFlag::TLS)) {
+ return Status::NotAuthorized("server does not support required TLS encryption");
+ }
+
+ // Get the authentication type which the server would like to use.
+ DCHECK_LE(response.authn_types().size(), 1);
+ if (response.authn_types().empty()) {
+ // If the server doesn't send back an authentication type, default to SASL
+ // in order to maintain backwards compatibility.
+ negotiated_authn_ = AuthenticationType::SASL;
+ } else {
+ const auto& authn_type = response.authn_types(0);
+ switch (authn_type.type_case()) {
+ case AuthenticationTypePB::kSasl:
+ negotiated_authn_ = AuthenticationType::SASL;
+ break;
+ case AuthenticationTypePB::kToken:
+ // TODO(todd): we should also be checking tls_context_->has_trusted_cert()
+ // here to match the original logic we used to advertise TOKEN support,
+ // or perhaps just check explicitly whether we advertised TOKEN.
+ if (!authn_token_) {
+ return Status::RuntimeError(
+ "server chose token authentication, but client has no token");
+ }
+ negotiated_authn_ = AuthenticationType::TOKEN;
+ return Status::OK();
+ case AuthenticationTypePB::kCertificate:
+ if (!tls_context_->has_signed_cert()) {
+ return Status::RuntimeError(
+ "server chose certificate authentication, but client has no certificate");
+ }
+ negotiated_authn_ = AuthenticationType::CERTIFICATE;
+ return Status::OK();
+ case AuthenticationTypePB::TYPE_NOT_SET:
+ return Status::RuntimeError("server chose an unknown authentication type");
+ }
+ }
+
+ DCHECK_EQ(negotiated_authn_, AuthenticationType::SASL);
+
+ // Build a map of the SASL mechanisms offered by the server.
+ set<SaslMechanism::Type> client_mechs(helper_.EnabledMechs());
+ set<SaslMechanism::Type> server_mechs;
+ for (const NegotiatePB::SaslMechanism& sasl_mech : response.sasl_mechanisms()) {
+ auto mech = SaslMechanism::value_of(sasl_mech.mechanism());
+ if (mech == SaslMechanism::INVALID) {
+ continue;
+ }
+ server_mechs.insert(mech);
+ }
+
+ // Determine which SASL mechanism to use for authenticating the connection.
+ // We pick the most preferred mechanism which is supported by both parties.
+ // The preference list in order of most to least preferred:
+ // * GSSAPI
+ // * PLAIN
+ //
+ // TODO(KUDU-1921): allow the client to require authentication.
+ if (ContainsKey(client_mechs, SaslMechanism::GSSAPI) &&
+ ContainsKey(server_mechs, SaslMechanism::GSSAPI)) {
+
+ // Check that the client has local Kerberos credentials, and if not fall
+ // back to an alternate mechanism.
+ Status s = CheckGSSAPI();
+ if (s.ok()) {
+ negotiated_mech_ = SaslMechanism::GSSAPI;
+ return Status::OK();
+ }
+
+ TRACE("Kerberos authentication credentials are not available: $0", s.ToString());
+ client_mechs.erase(SaslMechanism::GSSAPI);
+ }
+
+ if (ContainsKey(client_mechs, SaslMechanism::PLAIN) &&
+ ContainsKey(server_mechs, SaslMechanism::PLAIN)) {
+ negotiated_mech_ = SaslMechanism::PLAIN;
+ return Status::OK();
+ }
+
+ // There are no mechanisms in common.
+ if (ContainsKey(server_mechs, SaslMechanism::GSSAPI) &&
+ !ContainsKey(client_mechs, SaslMechanism::GSSAPI)) {
+ return Status::NotAuthorized("server requires authentication, "
+ "but client does not have Kerberos credentials available");
+ }
+ if (!ContainsKey(server_mechs, SaslMechanism::GSSAPI) &&
+ ContainsKey(client_mechs, SaslMechanism::GSSAPI)) {
+ return Status::NotAuthorized("client requires authentication, "
+ "but server does not have Kerberos enabled");
+ }
+ string msg = Substitute("client/server supported SASL mechanism mismatch; "
+ "client mechanisms: [$0], server mechanisms: [$1]",
+ JoinMapped(client_mechs, SaslMechanism::name_of, ", "),
+ JoinMapped(server_mechs, SaslMechanism::name_of, ", "));
+
+ // For now, there should never be a SASL mechanism mismatch that isn't due
+ // to one of the sides requiring Kerberos and the other not having it, so
+ // lets sanity check that.
+ DCHECK(STLSetIntersection(client_mechs, server_mechs).empty()) << msg;
+ return Status::NotAuthorized(msg);
+}
+
+Status ClientNegotiation::SendTlsHandshake(string tls_token) {
+ TRACE("Sending TLS_HANDSHAKE message to server");
+ NegotiatePB msg;
+ msg.set_step(NegotiatePB::TLS_HANDSHAKE);
+ msg.mutable_tls_handshake()->swap(tls_token);
+ return SendNegotiatePB(msg);
+}
+
+Status ClientNegotiation::HandleTlsHandshake(const NegotiatePB& response) {
+ if (PREDICT_FALSE(response.step() != NegotiatePB::TLS_HANDSHAKE)) {
+ return Status::NotAuthorized("expected TLS_HANDSHAKE step",
+ NegotiatePB::NegotiateStep_Name(response.step()));
+ }
+ TRACE("Received TLS_HANDSHAKE response from server");
+
+ if (PREDICT_FALSE(!response.has_tls_handshake())) {
+ return Status::NotAuthorized("No TLS handshake token in TLS_HANDSHAKE response from server");
+ }
+
+ string token;
+ Status s = tls_handshake_.Continue(response.tls_handshake(), &token);
+ if (s.IsIncomplete()) {
+ // Another roundtrip is required to complete the handshake.
+ RETURN_NOT_OK(SendTlsHandshake(std::move(token)));
+ }
+
+ // Check that the handshake step didn't produce an error. Will also propagate
+ // an Incomplete status.
+ RETURN_NOT_OK(s);
+
+ // TLS handshake is finished.
+ if (ContainsKey(server_features_, TLS_AUTHENTICATION_ONLY) &&
+ ContainsKey(client_features_, TLS_AUTHENTICATION_ONLY)) {
+ TRACE("Negotiated auth-only $0 with cipher $1",
+ tls_handshake_.GetProtocol(), tls_handshake_.GetCipherDescription());
+ return tls_handshake_.FinishNoWrap(*socket_);
+ }
+
+ TRACE("Negotiated $0 with cipher $1",
+ tls_handshake_.GetProtocol(), tls_handshake_.GetCipherDescription());
+ return tls_handshake_.Finish(&socket_);
+}
+
+Status ClientNegotiation::AuthenticateBySasl(faststring* recv_buf,
+ unique_ptr<ErrorStatusPB>* rpc_error) {
+ RETURN_NOT_OK(InitSaslClient());
+ Status s = SendSaslInitiate();
+
+ // HandleSasl[Initiate, Challenge] return incomplete if an additional
+ // challenge step is required, or OK if a SASL_SUCCESS message is expected.
+ while (s.IsIncomplete()) {
+ NegotiatePB challenge;
+ RETURN_NOT_OK(RecvNegotiatePB(&challenge, recv_buf, rpc_error));
+ s = HandleSaslChallenge(challenge);
+ }
+
+ // Propagate failure from SendSaslInitiate or HandleSaslChallenge.
+ RETURN_NOT_OK(s);
+
+ // Server challenges are over; we now expect the success message.
+ NegotiatePB success;
+ RETURN_NOT_OK(RecvNegotiatePB(&success, recv_buf, rpc_error));
+ return HandleSaslSuccess(success);
+}
+
+Status ClientNegotiation::AuthenticateByToken(faststring* recv_buf,
+ unique_ptr<ErrorStatusPB>* rpc_error) {
+ // Sanity check that TLS has been negotiated. Sending the token on an
+ // unencrypted channel is a big no-no.
+ CHECK(tls_negotiated_);
+
+ // Send the token to the server.
+ NegotiatePB pb;
+ pb.set_step(NegotiatePB::TOKEN_EXCHANGE);
+ *pb.mutable_authn_token() = std::move(*authn_token_);
+ RETURN_NOT_OK(SendNegotiatePB(pb));
+ pb.Clear();
+
+ // Check that the server responds with a non-error TOKEN_EXCHANGE message.
+ RETURN_NOT_OK(RecvNegotiatePB(&pb, recv_buf, rpc_error));
+ if (pb.step() != NegotiatePB::TOKEN_EXCHANGE) {
+ return Status::NotAuthorized("expected TOKEN_EXCHANGE step",
+ NegotiatePB::NegotiateStep_Name(pb.step()));
+ }
+
+ return Status::OK();
+}
+
+Status ClientNegotiation::SendSaslInitiate() {
+ TRACE("Initiating SASL $0 handshake", SaslMechanism::name_of(negotiated_mech_));
+
+ // At this point we've already chosen the SASL mechanism to use
+ // (negotiated_mech_), but we need to let the SASL library know. SASL likes to
+ // choose the mechanism from among a list of possible options, so we simply
+ // provide it one option, and then check that it picks that option.
+
+ const char* init_msg = nullptr;
+ unsigned init_msg_len = 0;
+ const char* negotiated_mech = nullptr;
+
+ /* select a mechanism for a connection
+ * mechlist -- mechanisms server has available (punctuation ignored)
+ * output:
+ * prompt_need -- on SASL_INTERACT, list of prompts needed to continue
+ * clientout -- the initial client response to send to the server
+ * mech -- set to mechanism name
+ *
+ * Returns:
+ * SASL_OK -- success
+ * SASL_CONTINUE -- negotiation required
+ * SASL_NOMEM -- not enough memory
+ * SASL_NOMECH -- no mechanism meets requested properties
+ * SASL_INTERACT -- user interaction needed to fill in prompt_need list
+ */
+ TRACE("Calling sasl_client_start()");
+ const Status s = WrapSaslCall(sasl_conn_.get(), [&]() {
+ return sasl_client_start(
+ sasl_conn_.get(), // The SASL connection context created by init()
+ SaslMechanism::name_of(negotiated_mech_), // The list of mechanisms to negotiate.
+ nullptr, // Disables INTERACT return if NULL.
+ &init_msg, // Filled in on success.
+ &init_msg_len, // Filled in on success.
+ &negotiated_mech); // Filled in on success.
+ });
+
+ if (PREDICT_FALSE(!s.IsIncomplete() && !s.ok())) {
+ return s;
+ }
+
+ // Check that the SASL library is using the mechanism that we picked.
+ DCHECK_EQ(SaslMechanism::value_of(negotiated_mech), negotiated_mech_);
+
+ // If the negotiated mechanism is GSSAPI (Kerberos), configure SASL to use
+ // integrity protection so that the channel bindings and nonce can be
+ // verified.
+ if (negotiated_mech_ == SaslMechanism::GSSAPI) {
+ RETURN_NOT_OK(EnableProtection(sasl_conn_.get(), SaslProtection::kIntegrity));
+ }
+
+ NegotiatePB msg;
+ msg.set_step(NegotiatePB::SASL_INITIATE);
+ msg.mutable_token()->assign(init_msg, init_msg_len);
+ msg.add_sasl_mechanisms()->set_mechanism(negotiated_mech);
+ RETURN_NOT_OK(SendNegotiatePB(msg));
+ return s;
+}
+
+Status ClientNegotiation::SendSaslResponse(const char* resp_msg, unsigned resp_msg_len) {
+ NegotiatePB reply;
+ reply.set_step(NegotiatePB::SASL_RESPONSE);
+ reply.mutable_token()->assign(resp_msg, resp_msg_len);
+ return SendNegotiatePB(reply);
+}
+
+Status ClientNegotiation::HandleSaslChallenge(const NegotiatePB& response) {
+ if (PREDICT_FALSE(response.step() != NegotiatePB::SASL_CHALLENGE)) {
+ return Status::NotAuthorized("expected SASL_CHALLENGE step",
+ NegotiatePB::NegotiateStep_Name(response.step()));
+ }
+ TRACE("Received SASL_CHALLENGE response from server");
+ if (PREDICT_FALSE(!response.has_token())) {
+ return Status::NotAuthorized("no token in SASL_CHALLENGE response from server");
+ }
+
+ const char* out = nullptr;
+ unsigned out_len = 0;
+ const Status s = DoSaslStep(response.token(), &out, &out_len);
+ if (PREDICT_FALSE(!s.IsIncomplete() && !s.ok())) {
+ return s;
+ }
+
+ RETURN_NOT_OK(SendSaslResponse(out, out_len));
+ return s;
+}
+
+Status ClientNegotiation::HandleSaslSuccess(const NegotiatePB& response) {
+ if (PREDICT_FALSE(response.step() != NegotiatePB::SASL_SUCCESS)) {
+ return Status::NotAuthorized("expected SASL_SUCCESS step",
+ NegotiatePB::NegotiateStep_Name(response.step()));
+ }
+ TRACE("Received SASL_SUCCESS response from server");
+
+ if (negotiated_mech_ == SaslMechanism::GSSAPI) {
+ if (response.has_nonce()) {
+ // Grab the nonce from the server, if it has sent one. We'll send it back
+ // later with SASL integrity protection as part of the connection context.
+ nonce_ = response.nonce();
+ }
+
+ if (tls_negotiated_) {
+ // Check the channel bindings provided by the server against the expected channel bindings.
+ if (!response.has_channel_bindings()) {
+ return Status::NotAuthorized("no channel bindings provided by server");
+ }
+
+ security::Cert cert;
+ RETURN_NOT_OK(tls_handshake_.GetRemoteCert(&cert));
+
+ string expected_channel_bindings;
+ RETURN_NOT_OK_PREPEND(cert.GetServerEndPointChannelBindings(&expected_channel_bindings),
+ "failed to generate channel bindings");
+
+ Slice received_channel_bindings;
+ RETURN_NOT_OK_PREPEND(SaslDecode(sasl_conn_.get(),
+ response.channel_bindings(),
+ &received_channel_bindings),
+ "failed to decode channel bindings");
+
+ if (expected_channel_bindings != received_channel_bindings) {
+ Sockaddr addr;
+ ignore_result(socket_->GetPeerAddress(&addr));
+
+ LOG(WARNING) << "Received invalid channel bindings from server "
+ << addr.ToString()
+ << ", this could indicate an active network man-in-the-middle";
+ return Status::NotAuthorized("channel bindings do not match");
+ }
+ }
+ }
+
+ return Status::OK();
+}
+
+Status ClientNegotiation::DoSaslStep(const string& in, const char** out, unsigned* out_len) {
+ TRACE("Calling sasl_client_step()");
+
+ return WrapSaslCall(sasl_conn_.get(), [&]() {
+ return sasl_client_step(sasl_conn_.get(), in.c_str(), in.length(), nullptr, out, out_len);
+ });
+}
+
+Status ClientNegotiation::SendConnectionContext() {
+ TRACE("Sending connection context");
+ RequestHeader header;
+ header.set_call_id(kConnectionContextCallId);
+
+ ConnectionContextPB conn_context;
+ // This field is deprecated but used by servers <Kudu 1.1. Newer server versions ignore
+ // this and use the SASL-provided username instead.
+ conn_context.mutable_deprecated_user_info()->set_real_user(
+ plain_auth_user_.empty() ? "cpp-client" : plain_auth_user_);
+
+ if (nonce_) {
+ // Reply with the SASL-protected nonce. We only set the nonce when using SASL GSSAPI.
+ Slice ciphertext;
+ RETURN_NOT_OK(SaslEncode(sasl_conn_.get(), *nonce_, &ciphertext));
+ *conn_context.mutable_encoded_nonce() = ciphertext.ToString();
+ }
+
+ return SendFramedMessageBlocking(socket(), header, conn_context, deadline_);
+}
+
+int ClientNegotiation::GetOptionCb(const char* plugin_name, const char* option,
+ const char** result, unsigned* len) {
+ return helper_.GetOptionCb(plugin_name, option, result, len);
+}
+
+// Used for PLAIN.
+// SASL callback for SASL_CB_USER, SASL_CB_AUTHNAME, SASL_CB_LANGUAGE
+int ClientNegotiation::SimpleCb(int id, const char** result, unsigned* len) {
+ if (PREDICT_FALSE(!helper_.IsPlainEnabled())) {
+ LOG(DFATAL) << "Simple callback called, but PLAIN auth is not enabled";
+ return SASL_FAIL;
+ }
+ if (PREDICT_FALSE(result == nullptr)) {
+ LOG(DFATAL) << "result outparam is NULL";
+ return SASL_BADPARAM;
+ }
+ switch (id) {
+ // TODO(unknown): Support impersonation?
+ // For impersonation, USER is the impersonated user, AUTHNAME is the "sudoer".
+ case SASL_CB_USER:
+ TRACE("callback for SASL_CB_USER");
+ *result = plain_auth_user_.c_str();
+ if (len != nullptr) *len = plain_auth_user_.length();
+ break;
+ case SASL_CB_AUTHNAME:
+ TRACE("callback for SASL_CB_AUTHNAME");
+ *result = plain_auth_user_.c_str();
+ if (len != nullptr) *len = plain_auth_user_.length();
+ break;
+ case SASL_CB_LANGUAGE:
+ LOG(DFATAL) << "Unable to handle SASL callback type SASL_CB_LANGUAGE"
+ << "(" << id << ")";
+ return SASL_BADPARAM;
+ default:
+ LOG(DFATAL) << "Unexpected SASL callback type: " << id;
+ return SASL_BADPARAM;
+ }
+
+ return SASL_OK;
+}
+
+// Used for PLAIN.
+// SASL callback for SASL_CB_PASS: User password.
+int ClientNegotiation::SecretCb(sasl_conn_t* conn, int id, sasl_secret_t** psecret) {
+ if (PREDICT_FALSE(!helper_.IsPlainEnabled())) {
+ LOG(DFATAL) << "Plain secret callback called, but PLAIN auth is not enabled";
+ return SASL_FAIL;
+ }
+ switch (id) {
+ case SASL_CB_PASS: {
+ if (!conn || !psecret) return SASL_BADPARAM;
+
+ size_t len = plain_pass_.length();
+ *psecret = reinterpret_cast<sasl_secret_t*>(malloc(sizeof(sasl_secret_t) + len));
+ if (!*psecret) {
+ return SASL_NOMEM;
+ }
+ psecret_.reset(*psecret); // Ensure that we free() this structure later.
+ (*psecret)->len = len;
+ memcpy((*psecret)->data, plain_pass_.c_str(), len + 1);
+ break;
+ }
+ default:
+ LOG(DFATAL) << "Unexpected SASL callback type: " << id;
+ return SASL_BADPARAM;
+ }
+
+ return SASL_OK;
+}
+
+namespace {
+// Retrieve the GSSAPI error description for an error code and type.
+string gss_error_description(OM_uint32 code, int type) {
+ string description;
+ OM_uint32 message_context = 0;
+
+ do {
+ if (!description.empty()) {
+ description.append(": ");
+ }
+ OM_uint32 minor = 0;
+ gss_buffer_desc buf;
+ gss_display_status(&minor, code, type, GSS_C_NULL_OID, &message_context, &buf);
+ description.append(static_cast<const char*>(buf.value), buf.length);
+ gss_release_buffer(&minor, &buf);
+ } while (message_context != 0);
+
+ return description;
+}
+
+// Transforms a GSSAPI major and minor error code into a Kudu Status.
+Status check_gss_error(OM_uint32 major, OM_uint32 minor) {
+ if (GSS_ERROR(major)) {
+ return Status::NotAuthorized(gss_error_description(major, GSS_C_GSS_CODE),
+ gss_error_description(minor, GSS_C_MECH_CODE));
+ }
+ return Status::OK();
+}
+} // anonymous namespace
+
+Status ClientNegotiation::CheckGSSAPI() {
+ OM_uint32 major, minor;
+ gss_cred_id_t cred = GSS_C_NO_CREDENTIAL;
+
+ // Acquire the Kerberos credential. This will fail if the client does not have
+ // a Kerberos tgt ticket. In theory it should be sufficient to call
+ // gss_inquire_cred_by_mech, but that causes a memory leak on RHEL 7.
+ major = gss_acquire_cred(&minor,
+ GSS_C_NO_NAME,
+ GSS_C_INDEFINITE,
+ const_cast<gss_OID_set>(gss_mech_set_krb5),
+ GSS_C_INITIATE,
+ &cred,
+ nullptr,
+ nullptr);
+ Status s = check_gss_error(major, minor);
+
+ // Inspect the Kerberos credential to determine if it is expired. The lifetime
+ // returned from gss_acquire_cred in the RHEL 6 version of krb5 is always 0,
+ // so it has to be done with a separate call to gss_inquire_cred. The lifetime
+ // holds the remaining validity of the tgt in seconds.
+ OM_uint32 lifetime;
+ if (s.ok()) {
+ major = gss_inquire_cred(&minor, cred, nullptr, &lifetime, nullptr, nullptr);
+ s = check_gss_error(major, minor);
+ }
+
+ // Release the credential even if gss_inquire_cred fails.
+ gss_release_cred(&minor, &cred);
+ RETURN_NOT_OK(s);
+
+ if (lifetime == 0) {
+ return Status::NotAuthorized("Kerberos ticket expired");
+ }
+ return Status::OK();
+}
+
+} // namespace rpc
+} // namespace kudu