You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/02/01 00:26:39 UTC

[kudu] 02/02: [java] better client and minicluster cleanup after tests finish

This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 919a4814da26d70644d1fb8f356027a879917438
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Wed Jan 30 17:14:28 2019 -0800

    [java] better client and minicluster cleanup after tests finish
    
    Many tests create their own client and minicluster instances, but don't take
    care to close them properly in all cases. This patch fixes that by:
    1. Converting all temporary instantations to use try-with-resources.
    2. For non-temporary instantiations, ensuring that close is always called,
       even if an exception is thrown mid-cleanup.
    
    The most interesting change is probably to KuduContext.scala, where we now
    close clients when clearing out the KuduClientCache (called by an @After).
    To avoid double closing, we need to unregister a client's shutdown hook
    after closing it.
    
    Change-Id: If91b3b2787915f0361537dd999e0daa8f7cb0a36
    Reviewed-on: http://gerrit.cloudera.org:8080/12322
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Reviewed-by: Grant Henke <gr...@apache.org>
---
 .../apache/kudu/client/TestAsyncKuduClient.java    |  16 +-
 .../apache/kudu/client/TestConnectToCluster.java   |  41 ++---
 .../apache/kudu/client/TestConnectionCache.java    |  15 +-
 .../org/apache/kudu/client/TestKuduClient.java     | 183 +++++++++++----------
 .../java/org/apache/kudu/client/TestSecurity.java  |  49 +++---
 .../java/org/apache/kudu/client/TestTimeouts.java  |  48 +++---
 .../java/org/apache/kudu/flume/sink/KuduSink.java  |   7 +-
 .../org/apache/kudu/spark/kudu/KuduContext.scala   |  35 +++-
 .../java/org/apache/kudu/test/KuduTestHarness.java |   2 +-
 .../org/apache/kudu/test/TestMiniKuduCluster.java  |   8 +-
 10 files changed, 200 insertions(+), 204 deletions(-)

diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
index a622459..d79d114 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
@@ -143,13 +143,15 @@ public class TestAsyncKuduClient {
     final int requestBatchSize = 10;
 
     // Test that a bad hostname for the master makes us error out quickly.
-    AsyncKuduClient invalidClient = new AsyncKuduClient.AsyncKuduClientBuilder(badHostname).build();
-    try {
-      invalidClient.listTabletServers().join(1000);
-      fail("This should have failed quickly");
-    } catch (Exception ex) {
-      assertTrue(ex instanceof NonRecoverableException);
-      assertTrue(ex.getMessage().contains(badHostname));
+    try (AsyncKuduClient invalidClient =
+           new AsyncKuduClient.AsyncKuduClientBuilder(badHostname).build()) {
+      try {
+        invalidClient.listTabletServers().join(1000);
+        fail("This should have failed quickly");
+      } catch (Exception ex) {
+        assertTrue(ex instanceof NonRecoverableException);
+        assertTrue(ex.getMessage().contains(badHostname));
+      }
     }
 
     List<Master.TabletLocationsPB> tabletLocations = new ArrayList<>();
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java
index bb73585..c755f63 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java
@@ -47,23 +47,16 @@ public class TestConnectToCluster {
    */
   @Test(timeout=60000)
   public void testFallbackConnectRpc() throws Exception {
-    MiniKuduCluster cluster = new MiniKuduCluster.MiniKuduClusterBuilder()
-        .addMasterServerFlag("--master_support_connect_to_master_rpc=0")
-        .numMasterServers(1)
-        .numTabletServers(0)
-        .build();
-    KuduClient c = null;
-    try {
-      c = new KuduClient.KuduClientBuilder(cluster.getMasterAddressesAsString())
-          .build();
+    try (MiniKuduCluster cluster = new MiniKuduCluster.MiniKuduClusterBuilder()
+         .addMasterServerFlag("--master_support_connect_to_master_rpc=0")
+         .numMasterServers(1)
+         .numTabletServers(0)
+         .build();
+         KuduClient c = new KuduClient.KuduClientBuilder(cluster.getMasterAddressesAsString())
+         .build()) {
       // Call some method which uses the master. This forces us to connect
       // and verifies that the fallback works.
       c.listTabletServers();
-    } finally {
-      if (c != null) {
-        c.close();
-      }
-      cluster.shutdown();
     }
   }
 
@@ -75,18 +68,15 @@ public class TestConnectToCluster {
    */
   @Test(timeout=60000)
   public void testConnectToOneOfManyMasters() throws Exception {
-    MiniKuduCluster cluster = new MiniKuduCluster.MiniKuduClusterBuilder()
-        .numMasterServers(3)
-        .numTabletServers(0)
-        .build();
     int successes = 0;
-    try {
+    try (MiniKuduCluster cluster = new MiniKuduCluster.MiniKuduClusterBuilder()
+         .numMasterServers(3)
+         .numTabletServers(0)
+         .build()) {
       String[] masterAddrs = cluster.getMasterAddressesAsString().split(",", -1);
       assertEquals(3, masterAddrs.length);
       for (String masterAddr : masterAddrs) {
-        KuduClient c = null;
-        try {
-          c = new KuduClient.KuduClientBuilder(masterAddr).build();
+        try (KuduClient c = new KuduClient.KuduClientBuilder(masterAddr).build()) {
           // Call some method which uses the master. This forces us to connect.
           c.listTabletServers();
           successes++;
@@ -98,15 +88,10 @@ public class TestConnectToCluster {
                   "\\(.+?,.+?,.+?\\).*"));
           Assert.assertThat(Joiner.on("\n").join(e.getStackTrace()),
               CoreMatchers.containsString("testConnectToOneOfManyMasters"));
-        } finally {
-          if (c != null) {
-            c.close();
-          }
         }
       }
-    } finally {
-      cluster.shutdown();
     }
+
     // Typically, one of the connections will have succeeded. However, it's possible
     // that 0 succeeded in the case that the masters were slow at electing
     // themselves.
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
index 134b814..ead72b2 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
@@ -37,12 +37,11 @@ public class TestConnectionCache {
 
   @Test(timeout = 50000)
   public void test() throws Exception {
-    MiniKuduCluster cluster = null;
-    try {
-      cluster = new MiniKuduCluster.MiniKuduClusterBuilder().numMasterServers(3).build();
-
-      final AsyncKuduClient client =
-          new AsyncKuduClient.AsyncKuduClientBuilder(cluster.getMasterAddressesAsString()).build();
+    try (MiniKuduCluster cluster = new MiniKuduCluster.MiniKuduClusterBuilder()
+                                                      .numMasterServers(3)
+                                                      .build();
+         AsyncKuduClient client = new AsyncKuduClient.AsyncKuduClientBuilder(
+             cluster.getMasterAddressesAsString()).build()) {
       // Below we ping the masters directly using RpcProxy, so if they aren't ready to process
       // RPCs we'll get an error. Here by listing the tables we make sure this won't happen since
       // it won't return until a master leader is found.
@@ -94,10 +93,6 @@ public class TestConnectionCache {
         waitForConnectionToTerminate(c);
       }
       assertTrue(allConnectionsTerminated(client));
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
     }
   }
 
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index 930abf2..303f53a 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -877,40 +877,40 @@ public class TestKuduClient {
   @Test(timeout = 100000)
   public void testCustomNioExecutor() throws Exception {
     long startTime = System.nanoTime();
-    final KuduClient localClient = new KuduClient.KuduClientBuilder(harness.getMasterAddressesAsString())
-        .nioExecutors(Executors.newFixedThreadPool(1), Executors.newFixedThreadPool(2))
-        .bossCount(1)
-        .workerCount(2)
-        .build();
-    long buildTime = (System.nanoTime() - startTime) / 1000000000L;
-    assertTrue("Building KuduClient is slow, maybe netty get stuck", buildTime < 3);
-    localClient.createTable(TABLE_NAME, basicSchema, getBasicCreateTableOptions());
-    Thread[] threads = new Thread[4];
-    for (int t = 0; t < 4; t++) {
-      final int id = t;
-      threads[t] = new Thread(new Runnable() {
-        @Override
-        public void run() {
-          try {
-            KuduTable table = localClient.openTable(TABLE_NAME);
-            KuduSession session = localClient.newSession();
-            session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
-            for (int i = 0; i < 100; i++) {
-              Insert insert = createBasicSchemaInsert(table, id * 100 + i);
-              session.apply(insert);
+    try (KuduClient localClient = new KuduClient.KuduClientBuilder(harness.getMasterAddressesAsString())
+         .nioExecutors(Executors.newFixedThreadPool(1), Executors.newFixedThreadPool(2))
+         .bossCount(1)
+         .workerCount(2)
+         .build()) {
+      long buildTime = (System.nanoTime() - startTime) / 1000000000L;
+      assertTrue("Building KuduClient is slow, maybe netty get stuck", buildTime < 3);
+      localClient.createTable(TABLE_NAME, basicSchema, getBasicCreateTableOptions());
+      Thread[] threads = new Thread[4];
+      for (int t = 0; t < 4; t++) {
+        final int id = t;
+        threads[t] = new Thread(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              KuduTable table = localClient.openTable(TABLE_NAME);
+              KuduSession session = localClient.newSession();
+              session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
+              for (int i = 0; i < 100; i++) {
+                Insert insert = createBasicSchemaInsert(table, id * 100 + i);
+                session.apply(insert);
+              }
+              session.close();
+            } catch (Exception e) {
+              fail("insert thread should not throw exception: " + e);
             }
-            session.close();
-          } catch (Exception e) {
-            fail("insert thread should not throw exception: " + e);
           }
-        }
-      });
-      threads[t].start();
-    }
-    for (int t = 0; t< 4;t++) {
-      threads[t].join();
+        });
+        threads[t].start();
+      }
+      for (int t = 0; t< 4;t++) {
+        threads[t].join();
+      }
     }
-    localClient.shutdown();
   }
 
   @Test(expected=IllegalArgumentException.class)
@@ -1063,50 +1063,49 @@ public class TestKuduClient {
                   .build();
           // From the same client continuously performs inserts to a tablet
           // in the given flush mode.
-          KuduClient kuduClient = asyncKuduClient.syncClient();
-          KuduSession session = kuduClient.newSession();
-          session.setFlushMode(flushMode);
-          KuduTable table = kuduClient.openTable(TABLE_NAME);
-          for (int i = 0; i < 3; i++) {
-            for (int j = 100 * i; j < 100 * (i + 1); j++) {
-              Insert insert = table.newInsert();
-              PartialRow row = insert.getRow();
-              row.addString("key", String.format("key_%02d", j));
-              row.addString("c1", "c1_" + j);
-              row.addString("c2", "c2_" + j);
-              row.addString("c3", "c3_" + j);
-              session.apply(insert);
-            }
-            session.flush();
-
-            // Perform a bunch of READ_YOUR_WRITES scans to all the replicas
-            // that count the rows. And verify that the count of the rows
-            // never go down from what previously observed, to ensure subsequent
-            // reads will not "go back in time" regarding writes that other
-            // clients have done.
-            for (int k = 0; k < 3; k++) {
-              AsyncKuduScanner scanner = asyncKuduClient.newScannerBuilder(table)
-                      .readMode(AsyncKuduScanner.ReadMode.READ_YOUR_WRITES)
-                      .replicaSelection(replicaSelection)
-                      .build();
-              KuduScanner syncScanner = new KuduScanner(scanner);
-              long preTs = asyncKuduClient.getLastPropagatedTimestamp();
-              assertNotEquals(AsyncKuduClient.NO_TIMESTAMP, preTs);
-
-              long row_count = countRowsInScan(syncScanner);
-              long expected_count = 100L * (i + 1);
-              assertTrue(expected_count <= row_count);
-
-              // After the scan, verify that the chosen snapshot timestamp is
-              // returned from the server and it is larger than the previous
-              // propagated timestamp.
-              assertNotEquals(AsyncKuduClient.NO_TIMESTAMP, scanner.getSnapshotTimestamp());
-              assertTrue(preTs < scanner.getSnapshotTimestamp());
-              syncScanner.close();
+          try (KuduClient kuduClient = asyncKuduClient.syncClient()) {
+            KuduSession session = kuduClient.newSession();
+            session.setFlushMode(flushMode);
+            KuduTable table = kuduClient.openTable(TABLE_NAME);
+            for (int i = 0; i < 3; i++) {
+              for (int j = 100 * i; j < 100 * (i + 1); j++) {
+                Insert insert = table.newInsert();
+                PartialRow row = insert.getRow();
+                row.addString("key", String.format("key_%02d", j));
+                row.addString("c1", "c1_" + j);
+                row.addString("c2", "c2_" + j);
+                row.addString("c3", "c3_" + j);
+                session.apply(insert);
+              }
+              session.flush();
+
+              // Perform a bunch of READ_YOUR_WRITES scans to all the replicas
+              // that count the rows. And verify that the count of the rows
+              // never go down from what previously observed, to ensure subsequent
+              // reads will not "go back in time" regarding writes that other
+              // clients have done.
+              for (int k = 0; k < 3; k++) {
+                AsyncKuduScanner scanner = asyncKuduClient.newScannerBuilder(table)
+                                           .readMode(AsyncKuduScanner.ReadMode.READ_YOUR_WRITES)
+                                           .replicaSelection(replicaSelection)
+                                           .build();
+                KuduScanner syncScanner = new KuduScanner(scanner);
+                long preTs = asyncKuduClient.getLastPropagatedTimestamp();
+                assertNotEquals(AsyncKuduClient.NO_TIMESTAMP, preTs);
+
+                long row_count = countRowsInScan(syncScanner);
+                long expected_count = 100L * (i + 1);
+                assertTrue(expected_count <= row_count);
+
+                // After the scan, verify that the chosen snapshot timestamp is
+                // returned from the server and it is larger than the previous
+                // propagated timestamp.
+                assertNotEquals(AsyncKuduClient.NO_TIMESTAMP, scanner.getSnapshotTimestamp());
+                assertTrue(preTs < scanner.getSnapshotTimestamp());
+                syncScanner.close();
+              }
             }
           }
-
-          kuduClient.close();
           return null;
         }
       };
@@ -1127,29 +1126,31 @@ public class TestKuduClient {
     Method methodToInvoke = KuduClient.class.getMethod(clientMethodName);
 
     for (int i = 0; i < 5; i++) {
-      KuduClient cl = new KuduClient.KuduClientBuilder(
-          harness.getMasterAddressesAsString()).build();
-      harness.restartLeaderMaster();
-
-      // There's a good chance that this executes while there's no leader
-      // master. It should retry until the leader election completes and a new
-      // leader master is elected.
-      methodToInvoke.invoke(cl);
+      try (KuduClient cl = new KuduClient.KuduClientBuilder(
+          harness.getMasterAddressesAsString()).build()) {
+        harness.restartLeaderMaster();
+
+        // There's a good chance that this executes while there's no leader
+        // master. It should retry until the leader election completes and a new
+        // leader master is elected.
+        methodToInvoke.invoke(cl);
+      }
     }
 
     // With all masters down, exportAuthenticationCredentials() should time out.
     harness.killAllMasterServers();
-    KuduClient cl = new KuduClient.KuduClientBuilder(
+    try (KuduClient cl = new KuduClient.KuduClientBuilder(
         harness.getMasterAddressesAsString())
-                    .defaultAdminOperationTimeoutMs(5000) // speed up the test
-                    .build();
-    try {
-      methodToInvoke.invoke(cl);
-      fail();
-    } catch (InvocationTargetException ex) {
-      assertTrue(ex.getTargetException() instanceof KuduException);
-      KuduException realEx = (KuduException)ex.getTargetException();
-      assertTrue(realEx.getStatus().isTimedOut());
+         .defaultAdminOperationTimeoutMs(5000) // speed up the test
+         .build()) {
+      try {
+        methodToInvoke.invoke(cl);
+        fail();
+      } catch (InvocationTargetException ex) {
+        assertTrue(ex.getTargetException() instanceof KuduException);
+        KuduException realEx = (KuduException)ex.getTargetException();
+        assertTrue(realEx.getStatus().isTimedOut());
+      }
     }
   }
 
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java
index 197e57c..210fa2a 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java
@@ -95,11 +95,14 @@ public class TestSecurity {
 
   @After
   public void tearDown() throws IOException {
-    if (client != null) {
-      client.close();
-    }
-    if (miniCluster != null) {
-      miniCluster.shutdown();
+    try {
+      if (client != null) {
+        client.close();
+      }
+    } finally {
+      if (miniCluster != null) {
+        miniCluster.shutdown();
+      }
     }
   }
 
@@ -145,9 +148,8 @@ public class TestSecurity {
     assertNotNull(authnData);
     String oldTicketCache = System.getProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY);
     System.clearProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY);
-    try {
-      KuduClient newClient = new KuduClient.KuduClientBuilder(
-          miniCluster.getMasterAddressesAsString()).build();
+    try (KuduClient newClient = new KuduClient.KuduClientBuilder(
+        miniCluster.getMasterAddressesAsString()).build()) {
 
       // Test that a client with no credentials cannot list servers.
       try {
@@ -206,8 +208,7 @@ public class TestSecurity {
 
     String oldTicketCache = System.getProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY);
     System.clearProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY);
-    try {
-      KuduClient newClient = createClient();
+    try (KuduClient newClient = createClient()) {
       newClient.importAuthenticationCredentials(authnData);
 
       // We shouldn't be able to connect because we have no appropriate CA cert.
@@ -240,8 +241,7 @@ public class TestSecurity {
     assertNotNull(authnData);
     String oldTicketCache = System.getProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY);
     System.clearProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY);
-    try {
-      final KuduClient newClient = createClient();
+    try (final KuduClient newClient = createClient()) {
       newClient.importAuthenticationCredentials(authnData);
 
       // Try to connect to all the masters and assert there is no
@@ -283,8 +283,7 @@ public class TestSecurity {
     assertNotNull(authnData);
     String oldTicketCache = System.getProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY);
     System.clearProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY);
-    try {
-      KuduClient newClient = createClient();
+    try (KuduClient newClient = createClient()) {
       newClient.importAuthenticationCredentials(authnData);
       System.err.println("=> imported auth");
 
@@ -384,17 +383,13 @@ public class TestSecurity {
     startCluster(ImmutableSet.of(Option.SHORT_TOKENS_AND_TICKETS));
     Subject subject = SecurityUtil.getSubjectFromTicketCacheOrNull();
     Assert.assertNotNull(subject);
-    try (Closeable c = cla.attach()) {
-      // Create a client attached to our own subject.
-      KuduClient newClient = createClientFromSubject(subject);
+    try (Closeable c = cla.attach();
+         // Create a client attached to our own subject.
+         KuduClient newClient = createClientFromSubject(subject)) {
       // It should not get auto-refreshed.
-      try {
-        assertEventualAuthenticationFailure(newClient,
-            "server requires authentication, but " +
-            "client Kerberos credentials (TGT) have expired");
-      } finally {
-        newClient.close();
-      }
+      assertEventualAuthenticationFailure(newClient,
+          "server requires authentication, but " +
+          "client Kerberos credentials (TGT) have expired");
     }
     Assert.assertThat(cla.getAppendedText(), CoreMatchers.containsString(
         "Using caller-provided subject with Kerberos principal test-admin@KRBTEST.COM."));
@@ -416,9 +411,9 @@ public class TestSecurity {
 
     Subject subject = SecurityUtil.getSubjectFromTicketCacheOrNull();
     Assert.assertNotNull(subject);
-    try (Closeable c = cla.attach()) {
-      // Create a client attached to our own subject.
-      KuduClient newClient = createClientFromSubject(subject);
+    try (Closeable c = cla.attach();
+         // Create a client attached to our own subject.
+         KuduClient newClient = createClientFromSubject(subject)) {
       // Run for longer than the renewable lifetime - this ensures that we
       // are indeed picking up the new credentials.
       for (Stopwatch sw = Stopwatch.createStarted();
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
index db3dcbd..20033ca 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
@@ -41,34 +41,34 @@ public class TestTimeouts {
    */
   @Test(timeout = 100000)
   public void testLowTimeouts() throws Exception {
-    KuduClient lowTimeoutsClient =
-        new KuduClient.KuduClientBuilder(harness.getMasterAddressesAsString())
-        .defaultAdminOperationTimeoutMs(1)
-        .defaultOperationTimeoutMs(1)
-        .build();
+    try (KuduClient lowTimeoutsClient =
+         new KuduClient.KuduClientBuilder(harness.getMasterAddressesAsString())
+         .defaultAdminOperationTimeoutMs(1)
+         .defaultOperationTimeoutMs(1)
+         .build()) {
+      try {
+        lowTimeoutsClient.listTabletServers();
+        fail("Should have timed out");
+      } catch (KuduException ex) {
+        // Expected.
+      }
 
-    try {
-      lowTimeoutsClient.listTabletServers();
-      fail("Should have timed out");
-    } catch (KuduException ex) {
-      // Expected.
-    }
-
-    harness.getClient().createTable(TABLE_NAME, getBasicSchema(), getBasicCreateTableOptions());
-    KuduTable table = lowTimeoutsClient.openTable(TABLE_NAME);
+      harness.getClient().createTable(TABLE_NAME, getBasicSchema(), getBasicCreateTableOptions());
+      KuduTable table = lowTimeoutsClient.openTable(TABLE_NAME);
 
-    KuduSession lowTimeoutSession = lowTimeoutsClient.newSession();
+      KuduSession lowTimeoutSession = lowTimeoutsClient.newSession();
 
-    OperationResponse response = lowTimeoutSession.apply(createBasicSchemaInsert(table, 1));
-    assertTrue(response.hasRowError());
-    assertTrue(response.getRowError().getErrorStatus().isTimedOut());
+      OperationResponse response = lowTimeoutSession.apply(createBasicSchemaInsert(table, 1));
+      assertTrue(response.hasRowError());
+      assertTrue(response.getRowError().getErrorStatus().isTimedOut());
 
-    KuduScanner lowTimeoutScanner = lowTimeoutsClient.newScannerBuilder(table).build();
-    try {
-      lowTimeoutScanner.nextRows();
-      fail("Should have timed out");
-    } catch (KuduException ex) {
-      assertTrue(ex.getStatus().isTimedOut());
+      KuduScanner lowTimeoutScanner = lowTimeoutsClient.newScannerBuilder(table).build();
+      try {
+        lowTimeoutScanner.nextRows();
+        fail("Should have timed out");
+      } catch (KuduException ex) {
+        assertTrue(ex.getStatus().isTimedOut());
+      }
     }
   }
 }
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
index f63f941..969c712 100644
--- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
+++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
@@ -178,13 +178,14 @@ public class KuduSink extends AbstractSink implements Configurable {
       if (client != null) {
         client.shutdown();
       }
-      client = null;
-      table = null;
-      session = null;
     } catch (Exception e) {
       ex = e;
       logger.error("Error closing client", e);
     }
+    client = null;
+    table = null;
+    session = null;
+
     sinkCounter.incrementConnectionClosedCount();
     sinkCounter.stop();
     if (ex != null) {
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index 461886c..a8716f3 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -478,7 +478,10 @@ private object KuduContext {
 }
 
 private object KuduClientCache {
+  val Log: Logger = LoggerFactory.getLogger(KuduClientCache.getClass)
+
   private case class CacheKey(kuduMaster: String, socketReadTimeoutMs: Option[Long])
+  private case class CacheValue(kuduClient: AsyncKuduClient, shutdownHookHandle: Runnable)
 
   /**
    * Set to
@@ -489,10 +492,24 @@ private object KuduClientCache {
    */
   private val ShutdownHookPriority = 100
 
-  private val clientCache = new mutable.HashMap[CacheKey, AsyncKuduClient]()
+  private val clientCache = new mutable.HashMap[CacheKey, CacheValue]()
 
   // Visible for testing.
-  private[kudu] def clearCacheForTests() = clientCache.clear()
+  private[kudu] def clearCacheForTests() = {
+    clientCache.values.foreach {
+      case cacheValue =>
+        try {
+          cacheValue.kuduClient.close()
+        } catch {
+          case e: Exception => Log.warn("Error while shutting down the test client", e);
+        }
+
+        // A client may only be closed once, so once we've close this client,
+        // we mustn't close it again at shutdown time.
+        ShutdownHookManager.get().removeShutdownHook(cacheValue.shutdownHookHandle)
+    }
+    clientCache.clear()
+  }
 
   def getAsyncClient(kuduMaster: String, socketReadTimeoutMs: Option[Long]): AsyncKuduClient = {
     val cacheKey = CacheKey(kuduMaster, socketReadTimeoutMs)
@@ -505,14 +522,14 @@ private object KuduClientCache {
         }
 
         val asyncClient = builder.build()
-        ShutdownHookManager
-          .get()
-          .addShutdownHook(new Runnable {
-            override def run(): Unit = asyncClient.close()
-          }, ShutdownHookPriority)
-        clientCache.put(cacheKey, asyncClient)
+        val hookHandle = new Runnable {
+          override def run(): Unit = asyncClient.close()
+        }
+        ShutdownHookManager.get().addShutdownHook(hookHandle, ShutdownHookPriority)
+        val cacheValue = CacheValue(asyncClient, hookHandle)
+        clientCache.put(cacheKey, cacheValue)
       }
-      return clientCache(cacheKey)
+      return clientCache(cacheKey).kuduClient
     }
   }
 }
diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java
index a462853..d4e008a 100644
--- a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java
+++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java
@@ -155,7 +155,7 @@ public class KuduTestHarness extends ExternalResource {
         // shutting down the sync client effectively does that.
       }
     } catch (KuduException e) {
-      LOG.warn("Error while shutting down the test client");
+      LOG.warn("Error while shutting down the test client", e);
     } finally {
       if (miniCluster != null) {
         miniCluster.shutdown();
diff --git a/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java b/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java
index 95a3843..1722a57 100644
--- a/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java
+++ b/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java
@@ -89,8 +89,8 @@ public class TestMiniKuduCluster {
                                                       .numMasterServers(NUM_MASTERS)
                                                       .numTabletServers(NUM_TABLET_SERVERS)
                                                       .enableKerberos()
-                                                      .build()) {
-      KuduClient client = new KuduClientBuilder(cluster.getMasterAddressesAsString()).build();
+                                                      .build();
+         KuduClient client = new KuduClientBuilder(cluster.getMasterAddressesAsString()).build()) {
       ListTablesResponse resp = client.getTablesList();
       assertTrue(resp.getTablesList().isEmpty());
       assertNull(client.getHiveMetastoreConfig());
@@ -103,8 +103,8 @@ public class TestMiniKuduCluster {
                                                       .numMasterServers(NUM_MASTERS)
                                                       .numTabletServers(NUM_TABLET_SERVERS)
                                                       .enableHiveMetastoreIntegration()
-                                                      .build()) {
-      KuduClient client = new KuduClientBuilder(cluster.getMasterAddressesAsString()).build();
+                                                      .build();
+         KuduClient client = new KuduClientBuilder(cluster.getMasterAddressesAsString()).build()) {
       assertNotNull(client.getHiveMetastoreConfig());
     }
   }