You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2019/01/28 23:58:20 UTC

[kudu] branch master updated (f1a6335 -> 0f6d33b)

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from f1a6335  KUDU-2456 Limit number of pending transactions logged
     new 3988ab9  [java] enable regression test for KUDU-2415
     new 174aa26  docs: add warning about overlapping directories
     new 1f1f875  KUDU-2662 TestKuduClient.testClientLocation is flaky
     new c7c902a  [java] deflake RYW tests in TestKuduClient
     new 0f6d33b  rpc: add TCP socket statistics to /rpcz

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/configuration.adoc                            |   5 +
 .../org/apache/kudu/client/AsyncKuduScanner.java   |   4 +-
 .../org/apache/kudu/client/TestKuduClient.java     |  21 +-
 .../apache/kudu/client/TestScannerMultiTablet.java |  13 +-
 .../java/org/apache/kudu/test/KuduTestHarness.java |  11 +
 src/kudu/rpc/connection.cc                         | 226 ++++++++++++++++++++-
 src/kudu/rpc/connection.h                          |   7 +-
 src/kudu/rpc/inbound_call.cc                       |   2 +-
 src/kudu/rpc/inbound_call.h                        |   4 +-
 src/kudu/rpc/messenger.cc                          |   6 +-
 src/kudu/rpc/messenger.h                           |  10 +-
 src/kudu/rpc/outbound_call.cc                      |   2 +-
 src/kudu/rpc/outbound_call.h                       |   4 +-
 src/kudu/rpc/reactor.cc                            |  12 +-
 src/kudu/rpc/reactor.h                             |  12 +-
 src/kudu/rpc/rpc-test.cc                           |  21 +-
 src/kudu/rpc/rpc_introspection.proto               |  34 +++-
 src/kudu/rpc/rpc_stub-test.cc                      |   8 +-
 src/kudu/server/rpcz-path-handler.cc               |  10 +-
 19 files changed, 353 insertions(+), 59 deletions(-)


[kudu] 02/05: docs: add warning about overlapping directories

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 174aa26fe17ea7b8d90e64372bfa3d27cb4b9bd5
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Fri Nov 30 10:43:23 2018 -0800

    docs: add warning about overlapping directories
    
    There wasn't a warning against having overlapping directories for
    different Kudu processes, but there should be since it would cause Kudu
    to not start up.
    
    Change-Id: I8905ef81aa6eaa6fbfa5845110b4016a0b56be9b
    Reviewed-on: http://gerrit.cloudera.org:8080/12013
    Tested-by: Kudu Jenkins
    Reviewed-by: Will Berkeley <wd...@gmail.com>
---
 docs/configuration.adoc | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/docs/configuration.adoc b/docs/configuration.adoc
index ebc1636..770e397 100644
--- a/docs/configuration.adoc
+++ b/docs/configuration.adoc
@@ -84,6 +84,11 @@ Additionally, `--fs_wal_dir` and `--fs_metadata_dir` may be the same as _one
 of_ the directories listed in `--fs_data_dirs`, but must not be sub-directories
 of any of them.
 
+WARNING: Each directory specified by a configuration flag on a given machine
+should be used by at most one Kudu process. If multiple Kudu processes on the
+same machine are configured to use the same directory, Kudu may refuse to start
+up.
+
 WARNING: Once `--fs_data_dirs` is set, extra tooling is required to change it.
 For more details, see the link:administration.html#change_dir_config[Kudu
 Administration docs].


[kudu] 01/05: [java] enable regression test for KUDU-2415

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 3988ab934a0d34a4342fd3b6a97720944b7e5aeb
Author: Hao Hao <ha...@cloudera.com>
AuthorDate: Fri Jan 25 17:43:51 2019 -0800

    [java] enable regression test for KUDU-2415
    
    As KUDU-2463 fixed the bug in clean time advancement that now it is
    guaranteed to have a non-zero clean time when servicing a scan, it is
    safe to enable the regression test to ensure scanning a never-written-to
    tablet from a fresh client with no propagated timestamp in
    READ_YOUR_WRITE mode should work.
    
    I looped TestScannerMultiTablet 1000 times and no failures are observed.
    
    Change-Id: I9532a435c3b4cf2286630b6809ffa2c8ae1a4acd
    Reviewed-on: http://gerrit.cloudera.org:8080/12278
    Tested-by: Kudu Jenkins
    Reviewed-by: Hao Hao <ha...@cloudera.com>
---
 .../java/org/apache/kudu/client/TestScannerMultiTablet.java | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)

diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
index dabe437..4ddcdd5 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
@@ -32,7 +32,6 @@ import com.stumbleupon.async.Deferred;
 
 import org.apache.kudu.test.KuduTestHarness;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -274,10 +273,7 @@ public class TestScannerMultiTablet {
   // Scanning a never-written-to tablet from a fresh client with no propagated
   // timestamp in "read-your-writes' mode should not fail.
   @Test(timeout = 100000)
-  @Ignore("TODO(KUDU-2415)") // not fixed yet!
   public void testReadYourWritesFreshClientFreshTable() throws Exception {
-    // NOTE: this test fails because the first tablet in the table
-    // is empty and has never been written to.
 
     // Perform scan in READ_YOUR_WRITES mode. Before the scan, verify that the
     // propagated timestamp is unset, since this is a fresh client.
@@ -289,7 +285,14 @@ public class TestScannerMultiTablet {
     assertEquals(AsyncKuduClient.NO_TIMESTAMP, asyncClient.getLastPropagatedTimestamp());
     assertEquals(AsyncKuduClient.NO_TIMESTAMP, scanner.getSnapshotTimestamp());
 
-    assertEquals(9, countRowsInScan(syncScanner));
+    // Since there isn't any write performed from the client, the count
+    // should range from [0, 9].
+    int count = countRowsInScan(syncScanner);
+    assertTrue(count >= 0);
+    assertTrue(count <= 9);
+
+    assertNotEquals(AsyncKuduClient.NO_TIMESTAMP, asyncClient.getLastPropagatedTimestamp());
+    assertNotEquals(AsyncKuduClient.NO_TIMESTAMP, scanner.getSnapshotTimestamp());
   }
 
   // Test multi tablets scan in READ_YOUR_WRITES mode for both AUTO_FLUSH_SYNC


[kudu] 05/05: rpc: add TCP socket statistics to /rpcz

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 0f6d33b4a29873197952335a5777ccf9163fc307
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Tue Jan 8 15:01:29 2019 -0800

    rpc: add TCP socket statistics to /rpcz
    
    This adds the ability to fetch various bits of socket-level information
    for each RPC connection and publish the info into /rpcz. The information
    itself is fetched using getsockopt(TCP_INFO) as well as ioctls to check
    the current send and receive queue lengths.
    
    This data can help resolve whether a use case is network bound or bound
    by the application itself. For example, a high number of retransmitted
    packets can indicate that the network path to the receiver is
    overloaded.
    
    Eventually we may want to expose some of this information on a per-call
    basis. However, doing so is quite tricky, since 'send()' completes when
    the data has been placed into the outbound packet queue and doesn't wait
    until the data is ACKed. We'd need to defer checking for retransmissions
    until all of the data has been ACKed, which is at some indeterminate
    point in the future. The very newest kernels allow subscribing to such
    notifications (along with lots of interesting stats) but, given none of
    that is available in el7, it's probably not worth tackling at this
    point.
    
    Change-Id: I552c9dd80c0730ccd6bf7b13bb63761744a854c2
    Reviewed-on: http://gerrit.cloudera.org:8080/12184
    Reviewed-by: Will Berkeley <wd...@gmail.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/rpc/connection.cc           | 226 ++++++++++++++++++++++++++++++++++-
 src/kudu/rpc/connection.h            |   7 +-
 src/kudu/rpc/inbound_call.cc         |   2 +-
 src/kudu/rpc/inbound_call.h          |   4 +-
 src/kudu/rpc/messenger.cc            |   6 +-
 src/kudu/rpc/messenger.h             |  10 +-
 src/kudu/rpc/outbound_call.cc        |   2 +-
 src/kudu/rpc/outbound_call.h         |   4 +-
 src/kudu/rpc/reactor.cc              |  12 +-
 src/kudu/rpc/reactor.h               |  12 +-
 src/kudu/rpc/rpc-test.cc             |  21 +++-
 src/kudu/rpc/rpc_introspection.proto |  34 +++++-
 src/kudu/rpc/rpc_stub-test.cc        |   8 +-
 src/kudu/server/rpcz-path-handler.cc |  10 +-
 14 files changed, 313 insertions(+), 45 deletions(-)

diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc
index 1632dd3..7a32a30 100644
--- a/src/kudu/rpc/connection.cc
+++ b/src/kudu/rpc/connection.cc
@@ -17,6 +17,9 @@
 
 #include "kudu/rpc/connection.h"
 
+#include <netinet/in.h>
+#include <string.h>
+
 #include <algorithm>
 #include <cerrno>
 #include <iostream>
@@ -31,7 +34,6 @@
 #include <glog/logging.h>
 
 #include "kudu/gutil/map-util.h"
-#include "kudu/util/slice.h"
 #include "kudu/gutil/strings/human_readable.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/inbound_call.h"
@@ -44,8 +46,15 @@
 #include "kudu/rpc/transfer.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/net/socket.h"
+#include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 
+#ifdef __linux__
+#include <sys/ioctl.h>
+#include <sys/socket.h>
+#include <linux/tcp.h>
+#endif
+
 using std::includes;
 using std::set;
 using std::shared_ptr;
@@ -57,6 +66,132 @@ namespace rpc {
 
 typedef OutboundCall::Phase Phase;
 
+namespace {
+
+// tcp_info struct duplicated from linux/tcp.h.
+//
+// This allows us to decouple the compile-time Linux headers from the
+// runtime Linux kernel. The compile-time headers (and kernel) might be
+// older than the runtime kernel, in which case an ifdef-based approach
+// wouldn't allow us to get all of the info available.
+//
+// NOTE: this struct has been annotated with some local notes about the
+// contents of each field.
+struct tcp_info {
+  // Various state-tracking information.
+  // ------------------------------------------------------------
+  uint8_t    tcpi_state;
+  uint8_t    tcpi_ca_state;
+  uint8_t    tcpi_retransmits;
+  uint8_t    tcpi_probes;
+  uint8_t    tcpi_backoff;
+  uint8_t    tcpi_options;
+  uint8_t    tcpi_snd_wscale : 4, tcpi_rcv_wscale : 4;
+  uint8_t    tcpi_delivery_rate_app_limited:1;
+
+  // Configurations.
+  // ------------------------------------------------------------
+  uint32_t   tcpi_rto;
+  uint32_t   tcpi_ato;
+  uint32_t   tcpi_snd_mss;
+  uint32_t   tcpi_rcv_mss;
+
+  // Counts of packets in various states in the outbound queue.
+  // At first glance one might think these are monotonic counters, but
+  // in fact they are instantaneous counts of queued packets and thus
+  // not very useful for our purposes.
+  // ------------------------------------------------------------
+  // Number of packets outstanding that haven't been acked.
+  uint32_t   tcpi_unacked;
+
+  // Number of packets outstanding that have been selective-acked.
+  uint32_t   tcpi_sacked;
+
+  // Number of packets outstanding that have been deemed lost (a SACK arrived
+  // for a later packet)
+  uint32_t   tcpi_lost;
+
+  // Number of packets in the queue that have been retransmitted.
+  uint32_t   tcpi_retrans;
+
+  // The number of packets towards the highest SACKed sequence number
+  // (some measure of reording, removed in later Linux versions by
+  // 737ff314563ca27f044f9a3a041e9d42491ef7ce)
+  uint32_t   tcpi_fackets;
+
+  // Times when various events occurred.
+  // ------------------------------------------------------------
+  uint32_t   tcpi_last_data_sent;
+  uint32_t   tcpi_last_ack_sent;     /* Not remembered, sorry. */
+  uint32_t   tcpi_last_data_recv;
+  uint32_t   tcpi_last_ack_recv;
+
+  // Path MTU.
+  uint32_t   tcpi_pmtu;
+
+  // Receiver slow start threshold.
+  uint32_t   tcpi_rcv_ssthresh;
+
+  // Smoothed RTT estimate and variance based on the time between sending data and receiving
+  // corresponding ACK. See https://tools.ietf.org/html/rfc2988 for details.
+  uint32_t   tcpi_rtt;
+  uint32_t   tcpi_rttvar;
+
+  // Slow start threshold.
+  uint32_t   tcpi_snd_ssthresh;
+  // Sender congestion window (in number of MSS-sized packets)
+  uint32_t   tcpi_snd_cwnd;
+  // Advertised MSS.
+  uint32_t   tcpi_advmss;
+  // Amount of packet reordering allowed.
+  uint32_t   tcpi_reordering;
+
+  // Receiver-side RTT estimate per the Dynamic Right Sizing algorithm:
+  //
+  // "A system that is only transmitting acknowledgements can still estimate the round-trip
+  // time by observing the time between when a byte is first acknowledged and the receipt of
+  // data that is at least one window beyond the sequence number that was acknowledged. If the
+  // sender is being throttled by the network, this estimate will be valid. However, if the
+  // sending application did not have any data to send, the measured time could be much larger
+  // than the actual round-trip time. Thus this measurement acts only as an upper-bound on the
+  // round-trip time and should be be used only when it is the only source of round-trip time
+  // information."
+  uint32_t   tcpi_rcv_rtt;
+  uint32_t   tcpi_rcv_space;
+
+  // Total number of retransmitted packets.
+  uint32_t   tcpi_total_retrans;
+
+  // Pacing-related metrics.
+  uint64_t   tcpi_pacing_rate;
+  uint64_t   tcpi_max_pacing_rate;
+
+  // Total bytes ACKed by remote peer.
+  uint64_t   tcpi_bytes_acked;    /* RFC4898 tcpEStatsAppHCThruOctetsAcked */
+  // Total bytes received (for which ACKs have been sent out).
+  uint64_t   tcpi_bytes_received; /* RFC4898 tcpEStatsAppHCThruOctetsReceived */
+  // Segments sent and received.
+  uint32_t   tcpi_segs_out;       /* RFC4898 tcpEStatsPerfSegsOut */
+  uint32_t   tcpi_segs_in;        /* RFC4898 tcpEStatsPerfSegsIn */
+
+  // The following metrics are quite new and not in el7.
+  // ------------------------------------------------------------
+  uint32_t   tcpi_notsent_bytes;
+  uint32_t   tcpi_min_rtt;
+  uint32_t   tcpi_data_segs_in;      /* RFC4898 tcpEStatsDataSegsIn */
+  uint32_t   tcpi_data_segs_out;     /* RFC4898 tcpEStatsDataSegsOut */
+
+  // Calculated rate at which data was delivered.
+  uint64_t   tcpi_delivery_rate;
+
+  // Timers for various states.
+  uint64_t   tcpi_busy_time;      /* Time (usec) busy sending data */
+  uint64_t   tcpi_rwnd_limited;   /* Time (usec) limited by receive window */
+  uint64_t   tcpi_sndbuf_limited; /* Time (usec) limited by send buffer */
+};
+
+} // anonymous namespace
+
 ///
 /// Connection
 ///
@@ -728,7 +863,7 @@ void Connection::MarkNegotiationComplete() {
   negotiation_complete_ = true;
 }
 
-Status Connection::DumpPB(const DumpRunningRpcsRequestPB& req,
+Status Connection::DumpPB(const DumpConnectionsRequestPB& req,
                           RpcConnectionPB* resp) {
   DCHECK(reactor_thread_->IsCurrentThread());
   resp->set_remote_ip(remote_.ToString());
@@ -760,8 +895,95 @@ Status Connection::DumpPB(const DumpRunningRpcsRequestPB& req,
   } else {
     LOG(FATAL);
   }
+#ifdef __linux__
+  if (negotiation_complete_) {
+    // TODO(todd): it's a little strange to not set socket level stats during
+    // negotiation, but we don't have access to the socket here until negotiation
+    // is complete.
+    WARN_NOT_OK(GetSocketStatsPB(resp->mutable_socket_stats()),
+                "could not fill in TCP info for RPC connection");
+  }
+#endif // __linux__
+  return Status::OK();
+}
+
+#ifdef __linux__
+Status Connection::GetSocketStatsPB(SocketStatsPB* pb) const {
+  DCHECK(reactor_thread_->IsCurrentThread());
+  int fd = socket_->GetFd();
+  CHECK_GE(fd, 0);
+
+  // Fetch TCP_INFO statistics from the kernel.
+  tcp_info ti;
+  memset(&ti, 0, sizeof(ti));
+  socklen_t len = sizeof(ti);
+  int rc = getsockopt(fd, IPPROTO_TCP, TCP_INFO, &ti, &len);
+  if (rc == 0) {
+#   define HAS_FIELD(field_name) \
+        (len >= offsetof(tcp_info, field_name) + sizeof(ti.field_name))
+    if (!HAS_FIELD(tcpi_total_retrans)) {
+      // All the fields up through tcpi_total_retrans were present since very old
+      // kernel versions, beyond our minimal supported. So, we can just bail if we
+      // don't get sufficient data back.
+      return Status::NotSupported("bad length returned for TCP_INFO");
+    }
+
+    pb->set_rtt(ti.tcpi_rtt);
+    pb->set_rttvar(ti.tcpi_rttvar);
+    pb->set_snd_cwnd(ti.tcpi_snd_cwnd);
+    pb->set_total_retrans(ti.tcpi_total_retrans);
+
+    // The following fields were added later in kernel development history.
+    // In RHEL6 they were backported starting in 6.8. Even though they were
+    // backported all together as a group, we'll just be safe and check for
+    // each individually.
+    if (HAS_FIELD(tcpi_pacing_rate)) {
+      pb->set_pacing_rate(ti.tcpi_pacing_rate);
+    }
+    if (HAS_FIELD(tcpi_max_pacing_rate)) {
+      pb->set_max_pacing_rate(ti.tcpi_max_pacing_rate);
+    }
+    if (HAS_FIELD(tcpi_bytes_acked)) {
+      pb->set_bytes_acked(ti.tcpi_bytes_acked);
+    }
+    if (HAS_FIELD(tcpi_bytes_received)) {
+      pb->set_bytes_received(ti.tcpi_bytes_received);
+    }
+    if (HAS_FIELD(tcpi_segs_out)) {
+      pb->set_segs_out(ti.tcpi_segs_out);
+    }
+    if (HAS_FIELD(tcpi_segs_in)) {
+      pb->set_segs_in(ti.tcpi_segs_in);
+    }
+
+    // Calculate sender bandwidth based on the same logic used by the 'ss' utility.
+    if (ti.tcpi_rtt > 0 && ti.tcpi_snd_mss && ti.tcpi_snd_cwnd) {
+      // Units:
+      //  rtt = usec
+      //  cwnd = number of MSS-size packets
+      //  mss = bytes / packet
+      //
+      // Dimensional analysis:
+      //   packets * bytes/packet * usecs/sec / usec -> bytes/sec
+      static constexpr int kUsecsPerSec = 1000000;
+      pb->set_send_bytes_per_sec(static_cast<int64_t>(ti.tcpi_snd_cwnd) *
+                                 ti.tcpi_snd_mss * kUsecsPerSec / ti.tcpi_rtt);
+    }
+  }
+
+  // Fetch the queue sizes.
+  int queue_len = 0;
+  rc = ioctl(fd, TIOCOUTQ, &queue_len);
+  if (rc == 0) {
+    pb->set_send_queue_bytes(queue_len);
+  }
+  rc = ioctl(fd, FIONREAD, &queue_len);
+  if (rc == 0) {
+    pb->set_receive_queue_bytes(queue_len);
+  }
   return Status::OK();
 }
+#endif // __linux__
 
 } // namespace rpc
 } // namespace kudu
diff --git a/src/kudu/rpc/connection.h b/src/kudu/rpc/connection.h
index 362a35b..9fb6a6c 100644
--- a/src/kudu/rpc/connection.h
+++ b/src/kudu/rpc/connection.h
@@ -50,12 +50,13 @@ namespace kudu {
 
 namespace rpc {
 
-class DumpRunningRpcsRequestPB;
+class DumpConnectionsRequestPB;
 class InboundCall;
 class OutboundCall;
 class RpcConnectionPB;
 class ReactorThread;
 class RpczStore;
+class SocketStatsPB;
 enum class CredentialsPolicy;
 
 //
@@ -193,7 +194,7 @@ class Connection : public RefCountedThreadSafe<Connection> {
   // Indicate that negotiation is complete and that the Reactor is now in control of the socket.
   void MarkNegotiationComplete();
 
-  Status DumpPB(const DumpRunningRpcsRequestPB& req,
+  Status DumpPB(const DumpConnectionsRequestPB& req,
                 RpcConnectionPB* resp);
 
   ReactorThread* reactor_thread() const { return reactor_thread_; }
@@ -298,6 +299,8 @@ class Connection : public RefCountedThreadSafe<Connection> {
   // reaches state specified in 'FLAGS_rpc_inject_cancellation_state'.
   void MaybeInjectCancellation(const std::shared_ptr<OutboundCall> &call);
 
+  Status GetSocketStatsPB(SocketStatsPB* pb) const;
+
   // The reactor thread that created this connection.
   ReactorThread* const reactor_thread_;
 
diff --git a/src/kudu/rpc/inbound_call.cc b/src/kudu/rpc/inbound_call.cc
index 6920071..655c453 100644
--- a/src/kudu/rpc/inbound_call.cc
+++ b/src/kudu/rpc/inbound_call.cc
@@ -250,7 +250,7 @@ string InboundCall::ToString() const {
                       header_.call_id());
 }
 
-void InboundCall::DumpPB(const DumpRunningRpcsRequestPB& req,
+void InboundCall::DumpPB(const DumpConnectionsRequestPB& req,
                          RpcCallInProgressPB* resp) {
   resp->mutable_header()->CopyFrom(header_);
   if (req.include_traces() && trace_) {
diff --git a/src/kudu/rpc/inbound_call.h b/src/kudu/rpc/inbound_call.h
index 0db4c37..07c57dc 100644
--- a/src/kudu/rpc/inbound_call.h
+++ b/src/kudu/rpc/inbound_call.h
@@ -54,7 +54,7 @@ class Trace;
 namespace rpc {
 
 class Connection;
-class DumpRunningRpcsRequestPB;
+class DumpConnectionsRequestPB;
 class RemoteUser;
 class RpcCallInProgressPB;
 class RpcSidecar;
@@ -135,7 +135,7 @@ class InboundCall {
 
   std::string ToString() const;
 
-  void DumpPB(const DumpRunningRpcsRequestPB& req, RpcCallInProgressPB* resp);
+  void DumpPB(const DumpConnectionsRequestPB& req, RpcCallInProgressPB* resp);
 
   const RemoteUser& remote_user() const;
 
diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc
index 17ac0c5..4129172 100644
--- a/src/kudu/rpc/messenger.cc
+++ b/src/kudu/rpc/messenger.cc
@@ -449,11 +449,11 @@ Status Messenger::Init() {
   return Status::OK();
 }
 
-Status Messenger::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
-                                  DumpRunningRpcsResponsePB* resp) {
+Status Messenger::DumpConnections(const DumpConnectionsRequestPB& req,
+                                  DumpConnectionsResponsePB* resp) {
   shared_lock<rw_spinlock> guard(lock_.get_lock());
   for (Reactor* reactor : reactors_) {
-    RETURN_NOT_OK(reactor->DumpRunningRpcs(req, resp));
+    RETURN_NOT_OK(reactor->DumpConnections(req, resp));
   }
   return Status::OK();
 }
diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h
index b3a78e0..56b087c 100644
--- a/src/kudu/rpc/messenger.h
+++ b/src/kudu/rpc/messenger.h
@@ -60,8 +60,8 @@ using security::RpcAuthentication;
 using security::RpcEncryption;
 
 class AcceptorPool;
-class DumpRunningRpcsRequestPB;
-class DumpRunningRpcsResponsePB;
+class DumpConnectionsRequestPB;
+class DumpConnectionsResponsePB;
 class InboundCall;
 class Messenger;
 class OutboundCall;
@@ -265,9 +265,9 @@ class Messenger {
   // Take ownership of the socket via Socket::Release
   void RegisterInboundSocket(Socket *new_socket, const Sockaddr &remote);
 
-  // Dump the current RPCs into the given protobuf.
-  Status DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
-                         DumpRunningRpcsResponsePB* resp);
+  // Dump info on related TCP connections into the given protobuf.
+  Status DumpConnections(const DumpConnectionsRequestPB& req,
+                         DumpConnectionsResponsePB* resp);
 
   // Run 'func' on a reactor thread after 'when' time elapses.
   //
diff --git a/src/kudu/rpc/outbound_call.cc b/src/kudu/rpc/outbound_call.cc
index 37d02ac..17761f5 100644
--- a/src/kudu/rpc/outbound_call.cc
+++ b/src/kudu/rpc/outbound_call.cc
@@ -449,7 +449,7 @@ string OutboundCall::ToString() const {
   return Substitute("RPC call $0 -> $1", remote_method_.ToString(), conn_id_.ToString());
 }
 
-void OutboundCall::DumpPB(const DumpRunningRpcsRequestPB& req,
+void OutboundCall::DumpPB(const DumpConnectionsRequestPB& req,
                           RpcCallInProgressPB* resp) {
   std::lock_guard<simple_spinlock> l(lock_);
   resp->mutable_header()->CopyFrom(header_);
diff --git a/src/kudu/rpc/outbound_call.h b/src/kudu/rpc/outbound_call.h
index 8d43891..c48e496 100644
--- a/src/kudu/rpc/outbound_call.h
+++ b/src/kudu/rpc/outbound_call.h
@@ -55,7 +55,7 @@ namespace kudu {
 namespace rpc {
 
 class CallResponse;
-class DumpRunningRpcsRequestPB;
+class DumpConnectionsRequestPB;
 class RpcCallInProgressPB;
 class RpcController;
 class RpcSidecar;
@@ -154,7 +154,7 @@ class OutboundCall {
 
   std::string ToString() const;
 
-  void DumpPB(const DumpRunningRpcsRequestPB& req, RpcCallInProgressPB* resp);
+  void DumpPB(const DumpConnectionsRequestPB& req, RpcCallInProgressPB* resp);
 
   ////////////////////////////////////////////////////////////
   // Getters
diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc
index c1832ef..b9308b5 100644
--- a/src/kudu/rpc/reactor.cc
+++ b/src/kudu/rpc/reactor.cc
@@ -273,8 +273,8 @@ Status ReactorThread::GetMetrics(ReactorMetrics* metrics) {
   return Status::OK();
 }
 
-Status ReactorThread::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
-                                      DumpRunningRpcsResponsePB* resp) {
+Status ReactorThread::DumpConnections(const DumpConnectionsRequestPB& req,
+                                      DumpConnectionsResponsePB* resp) {
   DCHECK(IsCurrentThread());
   for (const scoped_refptr<Connection>& conn : server_conns_) {
     RETURN_NOT_OK(conn->DumpPB(req, resp->add_inbound_connections()));
@@ -800,10 +800,10 @@ Status Reactor::RunOnReactorThread(const boost::function<Status()>& f) {
   return task.Wait();
 }
 
-Status Reactor::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
-                                DumpRunningRpcsResponsePB* resp) {
-  return RunOnReactorThread(boost::bind(&ReactorThread::DumpRunningRpcs, &thread_,
-                                        boost::ref(req), resp));
+Status Reactor::DumpConnections(const DumpConnectionsRequestPB& req,
+                                DumpConnectionsResponsePB* resp) {
+  return RunOnReactorThread(boost::bind(&ReactorThread::DumpConnections,
+                                        &thread_, boost::ref(req), resp));
 }
 
 class RegisterConnectionTask : public ReactorTask {
diff --git a/src/kudu/rpc/reactor.h b/src/kudu/rpc/reactor.h
index ce251c1..a74a3bb 100644
--- a/src/kudu/rpc/reactor.h
+++ b/src/kudu/rpc/reactor.h
@@ -49,8 +49,8 @@ namespace rpc {
 
 typedef std::list<scoped_refptr<Connection>> conn_list_t;
 
-class DumpRunningRpcsRequestPB;
-class DumpRunningRpcsResponsePB;
+class DumpConnectionsRequestPB;
+class DumpConnectionsResponsePB;
 class OutboundCall;
 class Reactor;
 class ReactorThread;
@@ -149,8 +149,8 @@ class ReactorThread {
   Status Init();
 
   // Add any connections on this reactor thread into the given status dump.
-  Status DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
-                         DumpRunningRpcsResponsePB* resp);
+  Status DumpConnections(const DumpConnectionsRequestPB& req,
+                         DumpConnectionsResponsePB* resp);
 
   // Shuts down a reactor thread, optionally waiting for it to exit.
   // Reactor::Shutdown() must have been called already.
@@ -356,8 +356,8 @@ class Reactor {
   Status GetMetrics(ReactorMetrics *metrics);
 
   // Add any connections on this reactor thread into the given status dump.
-  Status DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
-                         DumpRunningRpcsResponsePB* resp);
+  Status DumpConnections(const DumpConnectionsRequestPB& req,
+                         DumpConnectionsResponsePB* resp);
 
   // Queue a new incoming connection. Takes ownership of the underlying fd from
   // 'socket', but not the Socket object itself.
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index 2af84a1..0d70bbf 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -534,12 +534,25 @@ TEST_P(TestRpc, TestClientConnectionMetrics) {
     });
 
     // Test the OutboundTransfer queue.
-    DumpRunningRpcsRequestPB dump_req;
-    DumpRunningRpcsResponsePB dump_resp;
+    DumpConnectionsRequestPB dump_req;
+    DumpConnectionsResponsePB dump_resp;
     dump_req.set_include_traces(false);
-    ASSERT_OK(client_messenger->DumpRunningRpcs(dump_req, &dump_resp));
+    ASSERT_OK(client_messenger->DumpConnections(dump_req, &dump_resp));
     ASSERT_EQ(1, dump_resp.outbound_connections_size());
-    ASSERT_GT(dump_resp.outbound_connections(0).outbound_queue_size(), 0);
+    const auto& conn = dump_resp.outbound_connections(0);
+    ASSERT_GT(conn.outbound_queue_size(), 0);
+
+#ifdef __linux__
+    // Test that the socket statistics are present. We only assert on those that
+    // we know to be present on all kernel versions.
+    ASSERT_TRUE(conn.has_socket_stats());
+    ASSERT_GT(conn.socket_stats().rtt(), 0);
+    ASSERT_GT(conn.socket_stats().rttvar(), 0);
+    ASSERT_GT(conn.socket_stats().snd_cwnd(), 0);
+    ASSERT_GT(conn.socket_stats().send_bytes_per_sec(), 0);
+    ASSERT_TRUE(conn.socket_stats().has_send_queue_bytes());
+    ASSERT_TRUE(conn.socket_stats().has_receive_queue_bytes());
+#endif
 
     // Unblock all of the calls and wait for them to finish.
     latch.Wait();
diff --git a/src/kudu/rpc/rpc_introspection.proto b/src/kudu/rpc/rpc_introspection.proto
index 7685903..05be722 100644
--- a/src/kudu/rpc/rpc_introspection.proto
+++ b/src/kudu/rpc/rpc_introspection.proto
@@ -50,6 +50,33 @@ message RpcCallInProgressPB {
   optional State state = 4;
 }
 
+// The SocketStatsPB message is used to report on socket-level information
+// for RPC-related TCP connections (Linux-only). Essentially, the message
+// contains some metrics and counters from Linux-specific 'tcp_info' structure
+// defined in /usr/include/linux/tcp.h. For more information on the TCP on
+// Linux, see http://man7.org/linux/man-pages/man7/tcp.7.html
+message SocketStatsPB {
+  optional uint32 rtt = 1;
+  optional uint32 rttvar = 2;
+  optional uint32 snd_cwnd = 3;
+  optional uint32 total_retrans = 4;
+
+  optional uint32 pacing_rate = 5;
+  optional uint32 max_pacing_rate = 6;
+
+  optional uint64 bytes_acked = 7;
+  optional uint64 bytes_received = 8;
+  optional uint32 segs_out = 9;
+  optional uint32 segs_in = 10;
+
+  optional uint64 send_queue_bytes = 11;
+  optional uint64 receive_queue_bytes = 12;
+
+  // Calculated sender throughput.
+  optional int32 send_bytes_per_sec = 13;
+};
+
+// Debugging information about a currently-open RPC connection.
 message RpcConnectionPB {
   enum StateType {
     UNKNOWN = 999;
@@ -63,13 +90,16 @@ message RpcConnectionPB {
   optional string remote_user_credentials = 3;
   repeated RpcCallInProgressPB calls_in_flight = 4;
   optional int64 outbound_queue_size = 5;
+
+  // Information on the actual TCP connection as reported by the kernel.
+  optional SocketStatsPB socket_stats = 6;
 }
 
-message DumpRunningRpcsRequestPB {
+message DumpConnectionsRequestPB {
   optional bool include_traces = 1 [ default = false ];
 }
 
-message DumpRunningRpcsResponsePB {
+message DumpConnectionsResponsePB {
   repeated RpcConnectionPB inbound_connections = 1;
   repeated RpcConnectionPB outbound_connections = 2;
 }
diff --git a/src/kudu/rpc/rpc_stub-test.cc b/src/kudu/rpc/rpc_stub-test.cc
index dc92f2e..f7ba53a 100644
--- a/src/kudu/rpc/rpc_stub-test.cc
+++ b/src/kudu/rpc/rpc_stub-test.cc
@@ -593,11 +593,11 @@ TEST_F(RpcStubTest, TestDumpCallsInFlight) {
                boost::bind(&CountDownLatch::CountDown, &sleep.latch));
 
   // Check the running RPC status on the client messenger.
-  DumpRunningRpcsRequestPB dump_req;
-  DumpRunningRpcsResponsePB dump_resp;
+  DumpConnectionsRequestPB dump_req;
+  DumpConnectionsResponsePB dump_resp;
   dump_req.set_include_traces(true);
 
-  ASSERT_OK(client_messenger_->DumpRunningRpcs(dump_req, &dump_resp));
+  ASSERT_OK(client_messenger_->DumpConnections(dump_req, &dump_resp));
   LOG(INFO) << "client messenger: " << SecureDebugString(dump_resp);
   ASSERT_EQ(1, dump_resp.outbound_connections_size());
   ASSERT_EQ(1, dump_resp.outbound_connections(0).calls_in_flight_size());
@@ -610,7 +610,7 @@ TEST_F(RpcStubTest, TestDumpCallsInFlight) {
   // asynchronously off of the main thread (ie the server may not be handling it yet)
   for (int i = 0; i < 100; i++) {
     dump_resp.Clear();
-    ASSERT_OK(server_messenger_->DumpRunningRpcs(dump_req, &dump_resp));
+    ASSERT_OK(server_messenger_->DumpConnections(dump_req, &dump_resp));
     if (dump_resp.inbound_connections_size() > 0 &&
         dump_resp.inbound_connections(0).calls_in_flight_size() > 0) {
       break;
diff --git a/src/kudu/server/rpcz-path-handler.cc b/src/kudu/server/rpcz-path-handler.cc
index 80a2840..97f8faa 100644
--- a/src/kudu/server/rpcz-path-handler.cc
+++ b/src/kudu/server/rpcz-path-handler.cc
@@ -33,8 +33,8 @@
 #include "kudu/util/jsonwriter.h"
 #include "kudu/util/web_callback_registry.h"
 
-using kudu::rpc::DumpRunningRpcsRequestPB;
-using kudu::rpc::DumpRunningRpcsResponsePB;
+using kudu::rpc::DumpConnectionsRequestPB;
+using kudu::rpc::DumpConnectionsResponsePB;
 using kudu::rpc::DumpRpczStoreRequestPB;
 using kudu::rpc::DumpRpczStoreResponsePB;
 using kudu::rpc::Messenger;
@@ -49,14 +49,14 @@ namespace {
 void RpczPathHandler(const shared_ptr<Messenger>& messenger,
                      const Webserver::WebRequest& req,
                      Webserver::PrerenderedWebResponse* resp) {
-  DumpRunningRpcsResponsePB running_rpcs;
+  DumpConnectionsResponsePB running_rpcs;
   {
-    DumpRunningRpcsRequestPB dump_req;
+    DumpConnectionsRequestPB dump_req;
 
     string arg = FindWithDefault(req.parsed_args, "include_traces", "false");
     dump_req.set_include_traces(ParseLeadingBoolValue(arg.c_str(), false));
 
-    messenger->DumpRunningRpcs(dump_req, &running_rpcs);
+    messenger->DumpConnections(dump_req, &running_rpcs);
   }
   DumpRpczStoreResponsePB sampled_rpcs;
   {


[kudu] 03/05: KUDU-2662 TestKuduClient.testClientLocation is flaky

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 1f1f875cfb6a26b55821edd236f81f28ce06759b
Author: Will Berkeley <wd...@gmail.org>
AuthorDate: Mon Jan 28 14:16:52 2019 -0800

    KUDU-2662 TestKuduClient.testClientLocation is flaky
    
    At least on Linux, multimaster Kudu tests are run with each master on
    its own interface. E.g., in a failed run of testClientLocation, the
    three masters were running on 127.26.241.{124,125,126}. Each client is
    assigned a location by each master it connects to. The trick was, the
    client connection appeared to come from 127.26.241.X for the master on
    127.26.241.X. assign-locations.py remembers assigned locations by IP,
    so it was attempting to assign 3 locations for one client, plus 3 for
    the tablet servers. The test wrongly anticipated 4 assignments and
    configured the script to only do 4 assignments, leading to issues like
    
    16:05:46.987 [INFO - cluster stderr printer] (MiniKuduCluster.java:541) W0126 16:05:46.987164 27624 master_service.cc:535] unable to assign location to client {username='jenkins'}@127.26.241.126:60372: Runtime error: failed to run location mapping command: Runtime error: /data/somelongdirectorytoavoidrpathissues/src/kudu/java/kudu-client/build/resources/test/assign-location.py: process exited with non-zero status 1: Traceback (most recent call last):
    16:05:46.987 [INFO - cluster stderr printer] (MiniKuduCluster.java:541)   File "/data/somelongdirectorytoavoidrpathissues/src/kudu/java/kudu-client/build/resources/test/assign-location.py", line 244, in <module>
    16:05:46.987 [INFO - cluster stderr printer] (MiniKuduCluster.java:541)     main()
    16:05:46.987 [INFO - cluster stderr printer] (MiniKuduCluster.java:541)   File "/data/somelongdirectorytoavoidrpathissues/src/kudu/java/kudu-client/build/resources/test/assign-location.py", line 239, in main
    16:05:46.987 [INFO - cluster stderr printer] (MiniKuduCluster.java:541)     LocationAssignmentRule(args.location_mapping_rules), args.uid, args.relaxed)
    16:05:46.987 [INFO - cluster stderr printer] (MiniKuduCluster.java:541)   File "/data/somelongdirectorytoavoidrpathissues/src/kudu/java/kudu-client/build/resources/test/assign-location.py", line 198, in get_location
    16:05:46.987 [INFO - cluster stderr printer] (MiniKuduCluster.java:541)     seq + 1, uid, rule.location_mapping_rules, json.dumps(state)))
    16:05:46.987 [INFO - cluster stderr printer] (MiniKuduCluster.java:541) Exception: too many unique identifiers (5) to assign next location to 127.26.241.126 using mapping rules ['/L0:4']. State: {"mapping_rules": ["/L0:4"], "mappings": {"127.26.241.124": "/L0", "127.26.241.65": "/L0", "127.26.241.66": "/L0", "127.26.241.67": "/L0"}, "seq": 4}
    16:05:47.073 [INFO - cluster stderr printer] (MiniKuduCluster.java:541) W0126 16:05:47.073348 27688 master_service.cc:535] unable to assign location to client {username='jenkins'}@127.26.241.125:41204: Runtime error: failed to run location mapping command: Runtime error: /data/somelongdirectorytoavoidrpathissues/src/kudu/java/kudu-client/build/resources/test/assign-location.py: process exited with non-zero status 1: Traceback (most recent call last):
    16:05:47.073 [INFO - cluster stderr printer] (MiniKuduCluster.java:541)   File "/data/somelongdirectorytoavoidrpathissues/src/kudu/java/kudu-client/build/resources/test/assign-location.py", line 244, in <module>
    16:05:47.073 [INFO - cluster stderr printer] (MiniKuduCluster.java:541)     main()
    16:05:47.073 [INFO - cluster stderr printer] (MiniKuduCluster.java:541)   File "/data/somelongdirectorytoavoidrpathissues/src/kudu/java/kudu-client/build/resources/test/assign-location.py", line 239, in main
    16:05:47.073 [INFO - cluster stderr printer] (MiniKuduCluster.java:541)     LocationAssignmentRule(args.location_mapping_rules), args.uid, args.relaxed)
    16:05:47.073 [INFO - cluster stderr printer] (MiniKuduCluster.java:541)   File "/data/somelongdirectorytoavoidrpathissues/src/kudu/java/kudu-client/build/resources/test/assign-location.py", line 198, in get_location
    16:05:47.073 [INFO - cluster stderr printer] (MiniKuduCluster.java:541)     seq + 1, uid, rule.location_mapping_rules, json.dumps(state)))
    16:05:47.073 [INFO - cluster stderr printer] (MiniKuduCluster.java:541) Exception: too many unique identifiers (5) to assign next location to 127.26.241.125 using mapping rules ['/L0:4']. State: {"mapping_rules": ["/L0:4"], "mappings": {"127.26.241.124": "/L0", "127.26.241.65": "/L0", "127.26.241.66": "/L0", "127.26.241.67": "/L0"}, "seq": 4}
    
    This didn't cause a test failure in and of itself- the test would still
    pass if the leader master was first to try and assign a location to the
    client. So the test would randomly fail.
    
    The solution is just to up the number of allowed assignments. Since our
    Java tests are very simple and assign a single location only, this will
    avoid the problem.
    
    Change-Id: I2fed3eb62ab9d0f8c1baaa40726355a2a6737b3a
    Reviewed-on: http://gerrit.cloudera.org:8080/12289
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 .../src/test/java/org/apache/kudu/client/TestKuduClient.java  |  2 +-
 .../src/main/java/org/apache/kudu/test/KuduTestHarness.java   | 11 +++++++++++
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index b849e9f..e11a58e 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -1168,7 +1168,7 @@ public class TestKuduClient {
 
   @Test(timeout = 100000)
   @LocationConfig(locations = {
-      "/L0:4",
+      "/L0:6", // 3 masters, 1 client, 3 tablet servers: 3 * 1 + 3 = 6.
   })
   public void testClientLocation() throws Exception {
     // Do something that will cause the client to connect to the cluster.
diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java
index 44f6a22..a462853 100644
--- a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java
+++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java
@@ -437,6 +437,17 @@ public class KuduTestHarness extends ExternalResource {
    * define a location mapping for the cluster. Location
    * mappings are defined as a series of 'location:number'
    * pairs.
+   *
+   * Note that, in many Kudu tests, multiple masters will be run, each
+   * on their own network interface within the same machine, and client
+   * connections will appear to come from the same interface as the
+   * master being connected to. So, for example, if there are two
+   * clients, three masters, and three tablet servers, nine locations
+   * will be assigned: each client will get a location from each
+   * master (from a different IP), and each tablet server will get a
+   * location. The easiest way to work around this for our simple
+   * Java client tests is to set the number of mappings to be something
+   * at least (# masters) * (# clients) + (# tablet servers)
    */
   @Retention(RetentionPolicy.RUNTIME)
   @Target({ElementType.METHOD})


[kudu] 04/05: [java] deflake RYW tests in TestKuduClient

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit c7c902a08b8229612078357c7bb65927e4e4dbac
Author: Hao Hao <ha...@cloudera.com>
AuthorDate: Sun Jan 27 12:48:04 2019 -0800

    [java] deflake RYW tests in TestKuduClient
    
    RYW tests in TestKuduClient is flaky which fails with AssertionError
    when verifying the chosen snapshot timestamp returned from the server
    is larger than the previous propagated timestamp before the scan.
    
    It turns out to be a test only issue, which due to incorrect structure
    of the test. It aims to test READ_YOUR_WRITE mode for multiple clients,
    while the same client was being used. The distinction is important,
    because we want to ensure a single client can still preserve
    read-your-writes and read-your-reads session guarantees even there are
    concurrent reads/writes performed by other clients in READ_YOUR_WRITE
    scan mode.
    
    Without the fix, 192/1000 runs of TestKuduClient failed with this error.
    With the fix, 0/1000 runs of TestKuduClient failed.
    
    Change-Id: I951abb9197f7e6b6a4c70cdf89948206840ddeda
    Reviewed-on: http://gerrit.cloudera.org:8080/12276
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
 .../java/org/apache/kudu/client/AsyncKuduScanner.java |  4 ++--
 .../java/org/apache/kudu/client/TestKuduClient.java   | 19 +++++++++++++------
 2 files changed, 15 insertions(+), 8 deletions(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index 71b1146..36bb0af 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -421,8 +421,8 @@ public final class AsyncKuduScanner {
               resp.scanTimestamp != AsyncKuduClient.NO_TIMESTAMP) {
             // If the server-assigned timestamp is present in the tablet
             // server's response, store it in the scanner. The stored value
-            // is used for read operations at other tablet servers in the
-            // context of the same scan.
+            // is used for read operations in READ_AT_SNAPSHOT mode at
+            // other tablet servers in the context of the same scan.
             htTimestamp = resp.scanTimestamp;
           }
 
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index e11a58e..930abf2 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -1056,11 +1056,17 @@ public class TestKuduClient {
       Callable<Void> callable = new Callable<Void>() {
         @Override
         public Void call() throws Exception {
+          // Create a new client.
+          AsyncKuduClient asyncKuduClient = new AsyncKuduClient
+                  .AsyncKuduClientBuilder(harness.getMasterAddressesAsString())
+                  .defaultAdminOperationTimeoutMs(harness.DEFAULT_SLEEP)
+                  .build();
           // From the same client continuously performs inserts to a tablet
           // in the given flush mode.
-          KuduSession session = client.newSession();
+          KuduClient kuduClient = asyncKuduClient.syncClient();
+          KuduSession session = kuduClient.newSession();
           session.setFlushMode(flushMode);
-          KuduTable table = client.openTable(TABLE_NAME);
+          KuduTable table = kuduClient.openTable(TABLE_NAME);
           for (int i = 0; i < 3; i++) {
             for (int j = 100 * i; j < 100 * (i + 1); j++) {
               Insert insert = table.newInsert();
@@ -1079,14 +1085,13 @@ public class TestKuduClient {
             // reads will not "go back in time" regarding writes that other
             // clients have done.
             for (int k = 0; k < 3; k++) {
-              AsyncKuduScanner scanner = asyncClient.newScannerBuilder(table)
+              AsyncKuduScanner scanner = asyncKuduClient.newScannerBuilder(table)
                       .readMode(AsyncKuduScanner.ReadMode.READ_YOUR_WRITES)
                       .replicaSelection(replicaSelection)
                       .build();
               KuduScanner syncScanner = new KuduScanner(scanner);
-              long preTs = asyncClient.getLastPropagatedTimestamp();
-              assertNotEquals(AsyncKuduClient.NO_TIMESTAMP,
-                  asyncClient.getLastPropagatedTimestamp());
+              long preTs = asyncKuduClient.getLastPropagatedTimestamp();
+              assertNotEquals(AsyncKuduClient.NO_TIMESTAMP, preTs);
 
               long row_count = countRowsInScan(syncScanner);
               long expected_count = 100L * (i + 1);
@@ -1100,6 +1105,8 @@ public class TestKuduClient {
               syncScanner.close();
             }
           }
+
+          kuduClient.close();
           return null;
         }
       };