You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/02/01 00:26:37 UTC

[kudu] branch master updated (d0490b3 -> 919a481)

This is an automated email from the ASF dual-hosted git repository.

adar pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from d0490b3  [docs] An instruction to add repaired disk back to cluster
     new 0b80b0a  [java] fix ITClient test retrying
     new 919a481  [java] better client and minicluster cleanup after tests finish

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../test/java/org/apache/kudu/client/ITClient.java |  57 ++++---
 .../apache/kudu/client/TestAsyncKuduClient.java    |  16 +-
 .../apache/kudu/client/TestConnectToCluster.java   |  41 ++---
 .../apache/kudu/client/TestConnectionCache.java    |  15 +-
 .../org/apache/kudu/client/TestKuduClient.java     | 183 +++++++++++----------
 .../java/org/apache/kudu/client/TestSecurity.java  |  49 +++---
 .../java/org/apache/kudu/client/TestTimeouts.java  |  48 +++---
 .../java/org/apache/kudu/flume/sink/KuduSink.java  |   7 +-
 .../org/apache/kudu/spark/kudu/KuduContext.scala   |  35 +++-
 .../java/org/apache/kudu/test/KuduTestHarness.java |   2 +-
 .../org/apache/kudu/test/TestMiniKuduCluster.java  |   8 +-
 11 files changed, 234 insertions(+), 227 deletions(-)


[kudu] 02/02: [java] better client and minicluster cleanup after tests finish

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 919a4814da26d70644d1fb8f356027a879917438
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Wed Jan 30 17:14:28 2019 -0800

    [java] better client and minicluster cleanup after tests finish
    
    Many tests create their own client and minicluster instances, but don't take
    care to close them properly in all cases. This patch fixes that by:
    1. Converting all temporary instantations to use try-with-resources.
    2. For non-temporary instantiations, ensuring that close is always called,
       even if an exception is thrown mid-cleanup.
    
    The most interesting change is probably to KuduContext.scala, where we now
    close clients when clearing out the KuduClientCache (called by an @After).
    To avoid double closing, we need to unregister a client's shutdown hook
    after closing it.
    
    Change-Id: If91b3b2787915f0361537dd999e0daa8f7cb0a36
    Reviewed-on: http://gerrit.cloudera.org:8080/12322
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Reviewed-by: Grant Henke <gr...@apache.org>
---
 .../apache/kudu/client/TestAsyncKuduClient.java    |  16 +-
 .../apache/kudu/client/TestConnectToCluster.java   |  41 ++---
 .../apache/kudu/client/TestConnectionCache.java    |  15 +-
 .../org/apache/kudu/client/TestKuduClient.java     | 183 +++++++++++----------
 .../java/org/apache/kudu/client/TestSecurity.java  |  49 +++---
 .../java/org/apache/kudu/client/TestTimeouts.java  |  48 +++---
 .../java/org/apache/kudu/flume/sink/KuduSink.java  |   7 +-
 .../org/apache/kudu/spark/kudu/KuduContext.scala   |  35 +++-
 .../java/org/apache/kudu/test/KuduTestHarness.java |   2 +-
 .../org/apache/kudu/test/TestMiniKuduCluster.java  |   8 +-
 10 files changed, 200 insertions(+), 204 deletions(-)

diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
index a622459..d79d114 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
@@ -143,13 +143,15 @@ public class TestAsyncKuduClient {
     final int requestBatchSize = 10;
 
     // Test that a bad hostname for the master makes us error out quickly.
-    AsyncKuduClient invalidClient = new AsyncKuduClient.AsyncKuduClientBuilder(badHostname).build();
-    try {
-      invalidClient.listTabletServers().join(1000);
-      fail("This should have failed quickly");
-    } catch (Exception ex) {
-      assertTrue(ex instanceof NonRecoverableException);
-      assertTrue(ex.getMessage().contains(badHostname));
+    try (AsyncKuduClient invalidClient =
+           new AsyncKuduClient.AsyncKuduClientBuilder(badHostname).build()) {
+      try {
+        invalidClient.listTabletServers().join(1000);
+        fail("This should have failed quickly");
+      } catch (Exception ex) {
+        assertTrue(ex instanceof NonRecoverableException);
+        assertTrue(ex.getMessage().contains(badHostname));
+      }
     }
 
     List<Master.TabletLocationsPB> tabletLocations = new ArrayList<>();
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java
index bb73585..c755f63 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java
@@ -47,23 +47,16 @@ public class TestConnectToCluster {
    */
   @Test(timeout=60000)
   public void testFallbackConnectRpc() throws Exception {
-    MiniKuduCluster cluster = new MiniKuduCluster.MiniKuduClusterBuilder()
-        .addMasterServerFlag("--master_support_connect_to_master_rpc=0")
-        .numMasterServers(1)
-        .numTabletServers(0)
-        .build();
-    KuduClient c = null;
-    try {
-      c = new KuduClient.KuduClientBuilder(cluster.getMasterAddressesAsString())
-          .build();
+    try (MiniKuduCluster cluster = new MiniKuduCluster.MiniKuduClusterBuilder()
+         .addMasterServerFlag("--master_support_connect_to_master_rpc=0")
+         .numMasterServers(1)
+         .numTabletServers(0)
+         .build();
+         KuduClient c = new KuduClient.KuduClientBuilder(cluster.getMasterAddressesAsString())
+         .build()) {
       // Call some method which uses the master. This forces us to connect
       // and verifies that the fallback works.
       c.listTabletServers();
-    } finally {
-      if (c != null) {
-        c.close();
-      }
-      cluster.shutdown();
     }
   }
 
@@ -75,18 +68,15 @@ public class TestConnectToCluster {
    */
   @Test(timeout=60000)
   public void testConnectToOneOfManyMasters() throws Exception {
-    MiniKuduCluster cluster = new MiniKuduCluster.MiniKuduClusterBuilder()
-        .numMasterServers(3)
-        .numTabletServers(0)
-        .build();
     int successes = 0;
-    try {
+    try (MiniKuduCluster cluster = new MiniKuduCluster.MiniKuduClusterBuilder()
+         .numMasterServers(3)
+         .numTabletServers(0)
+         .build()) {
       String[] masterAddrs = cluster.getMasterAddressesAsString().split(",", -1);
       assertEquals(3, masterAddrs.length);
       for (String masterAddr : masterAddrs) {
-        KuduClient c = null;
-        try {
-          c = new KuduClient.KuduClientBuilder(masterAddr).build();
+        try (KuduClient c = new KuduClient.KuduClientBuilder(masterAddr).build()) {
           // Call some method which uses the master. This forces us to connect.
           c.listTabletServers();
           successes++;
@@ -98,15 +88,10 @@ public class TestConnectToCluster {
                   "\\(.+?,.+?,.+?\\).*"));
           Assert.assertThat(Joiner.on("\n").join(e.getStackTrace()),
               CoreMatchers.containsString("testConnectToOneOfManyMasters"));
-        } finally {
-          if (c != null) {
-            c.close();
-          }
         }
       }
-    } finally {
-      cluster.shutdown();
     }
+
     // Typically, one of the connections will have succeeded. However, it's possible
     // that 0 succeeded in the case that the masters were slow at electing
     // themselves.
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
index 134b814..ead72b2 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
@@ -37,12 +37,11 @@ public class TestConnectionCache {
 
   @Test(timeout = 50000)
   public void test() throws Exception {
-    MiniKuduCluster cluster = null;
-    try {
-      cluster = new MiniKuduCluster.MiniKuduClusterBuilder().numMasterServers(3).build();
-
-      final AsyncKuduClient client =
-          new AsyncKuduClient.AsyncKuduClientBuilder(cluster.getMasterAddressesAsString()).build();
+    try (MiniKuduCluster cluster = new MiniKuduCluster.MiniKuduClusterBuilder()
+                                                      .numMasterServers(3)
+                                                      .build();
+         AsyncKuduClient client = new AsyncKuduClient.AsyncKuduClientBuilder(
+             cluster.getMasterAddressesAsString()).build()) {
       // Below we ping the masters directly using RpcProxy, so if they aren't ready to process
       // RPCs we'll get an error. Here by listing the tables we make sure this won't happen since
       // it won't return until a master leader is found.
@@ -94,10 +93,6 @@ public class TestConnectionCache {
         waitForConnectionToTerminate(c);
       }
       assertTrue(allConnectionsTerminated(client));
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
     }
   }
 
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index 930abf2..303f53a 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -877,40 +877,40 @@ public class TestKuduClient {
   @Test(timeout = 100000)
   public void testCustomNioExecutor() throws Exception {
     long startTime = System.nanoTime();
-    final KuduClient localClient = new KuduClient.KuduClientBuilder(harness.getMasterAddressesAsString())
-        .nioExecutors(Executors.newFixedThreadPool(1), Executors.newFixedThreadPool(2))
-        .bossCount(1)
-        .workerCount(2)
-        .build();
-    long buildTime = (System.nanoTime() - startTime) / 1000000000L;
-    assertTrue("Building KuduClient is slow, maybe netty get stuck", buildTime < 3);
-    localClient.createTable(TABLE_NAME, basicSchema, getBasicCreateTableOptions());
-    Thread[] threads = new Thread[4];
-    for (int t = 0; t < 4; t++) {
-      final int id = t;
-      threads[t] = new Thread(new Runnable() {
-        @Override
-        public void run() {
-          try {
-            KuduTable table = localClient.openTable(TABLE_NAME);
-            KuduSession session = localClient.newSession();
-            session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
-            for (int i = 0; i < 100; i++) {
-              Insert insert = createBasicSchemaInsert(table, id * 100 + i);
-              session.apply(insert);
+    try (KuduClient localClient = new KuduClient.KuduClientBuilder(harness.getMasterAddressesAsString())
+         .nioExecutors(Executors.newFixedThreadPool(1), Executors.newFixedThreadPool(2))
+         .bossCount(1)
+         .workerCount(2)
+         .build()) {
+      long buildTime = (System.nanoTime() - startTime) / 1000000000L;
+      assertTrue("Building KuduClient is slow, maybe netty get stuck", buildTime < 3);
+      localClient.createTable(TABLE_NAME, basicSchema, getBasicCreateTableOptions());
+      Thread[] threads = new Thread[4];
+      for (int t = 0; t < 4; t++) {
+        final int id = t;
+        threads[t] = new Thread(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              KuduTable table = localClient.openTable(TABLE_NAME);
+              KuduSession session = localClient.newSession();
+              session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
+              for (int i = 0; i < 100; i++) {
+                Insert insert = createBasicSchemaInsert(table, id * 100 + i);
+                session.apply(insert);
+              }
+              session.close();
+            } catch (Exception e) {
+              fail("insert thread should not throw exception: " + e);
             }
-            session.close();
-          } catch (Exception e) {
-            fail("insert thread should not throw exception: " + e);
           }
-        }
-      });
-      threads[t].start();
-    }
-    for (int t = 0; t< 4;t++) {
-      threads[t].join();
+        });
+        threads[t].start();
+      }
+      for (int t = 0; t< 4;t++) {
+        threads[t].join();
+      }
     }
-    localClient.shutdown();
   }
 
   @Test(expected=IllegalArgumentException.class)
@@ -1063,50 +1063,49 @@ public class TestKuduClient {
                   .build();
           // From the same client continuously performs inserts to a tablet
           // in the given flush mode.
-          KuduClient kuduClient = asyncKuduClient.syncClient();
-          KuduSession session = kuduClient.newSession();
-          session.setFlushMode(flushMode);
-          KuduTable table = kuduClient.openTable(TABLE_NAME);
-          for (int i = 0; i < 3; i++) {
-            for (int j = 100 * i; j < 100 * (i + 1); j++) {
-              Insert insert = table.newInsert();
-              PartialRow row = insert.getRow();
-              row.addString("key", String.format("key_%02d", j));
-              row.addString("c1", "c1_" + j);
-              row.addString("c2", "c2_" + j);
-              row.addString("c3", "c3_" + j);
-              session.apply(insert);
-            }
-            session.flush();
-
-            // Perform a bunch of READ_YOUR_WRITES scans to all the replicas
-            // that count the rows. And verify that the count of the rows
-            // never go down from what previously observed, to ensure subsequent
-            // reads will not "go back in time" regarding writes that other
-            // clients have done.
-            for (int k = 0; k < 3; k++) {
-              AsyncKuduScanner scanner = asyncKuduClient.newScannerBuilder(table)
-                      .readMode(AsyncKuduScanner.ReadMode.READ_YOUR_WRITES)
-                      .replicaSelection(replicaSelection)
-                      .build();
-              KuduScanner syncScanner = new KuduScanner(scanner);
-              long preTs = asyncKuduClient.getLastPropagatedTimestamp();
-              assertNotEquals(AsyncKuduClient.NO_TIMESTAMP, preTs);
-
-              long row_count = countRowsInScan(syncScanner);
-              long expected_count = 100L * (i + 1);
-              assertTrue(expected_count <= row_count);
-
-              // After the scan, verify that the chosen snapshot timestamp is
-              // returned from the server and it is larger than the previous
-              // propagated timestamp.
-              assertNotEquals(AsyncKuduClient.NO_TIMESTAMP, scanner.getSnapshotTimestamp());
-              assertTrue(preTs < scanner.getSnapshotTimestamp());
-              syncScanner.close();
+          try (KuduClient kuduClient = asyncKuduClient.syncClient()) {
+            KuduSession session = kuduClient.newSession();
+            session.setFlushMode(flushMode);
+            KuduTable table = kuduClient.openTable(TABLE_NAME);
+            for (int i = 0; i < 3; i++) {
+              for (int j = 100 * i; j < 100 * (i + 1); j++) {
+                Insert insert = table.newInsert();
+                PartialRow row = insert.getRow();
+                row.addString("key", String.format("key_%02d", j));
+                row.addString("c1", "c1_" + j);
+                row.addString("c2", "c2_" + j);
+                row.addString("c3", "c3_" + j);
+                session.apply(insert);
+              }
+              session.flush();
+
+              // Perform a bunch of READ_YOUR_WRITES scans to all the replicas
+              // that count the rows. And verify that the count of the rows
+              // never go down from what previously observed, to ensure subsequent
+              // reads will not "go back in time" regarding writes that other
+              // clients have done.
+              for (int k = 0; k < 3; k++) {
+                AsyncKuduScanner scanner = asyncKuduClient.newScannerBuilder(table)
+                                           .readMode(AsyncKuduScanner.ReadMode.READ_YOUR_WRITES)
+                                           .replicaSelection(replicaSelection)
+                                           .build();
+                KuduScanner syncScanner = new KuduScanner(scanner);
+                long preTs = asyncKuduClient.getLastPropagatedTimestamp();
+                assertNotEquals(AsyncKuduClient.NO_TIMESTAMP, preTs);
+
+                long row_count = countRowsInScan(syncScanner);
+                long expected_count = 100L * (i + 1);
+                assertTrue(expected_count <= row_count);
+
+                // After the scan, verify that the chosen snapshot timestamp is
+                // returned from the server and it is larger than the previous
+                // propagated timestamp.
+                assertNotEquals(AsyncKuduClient.NO_TIMESTAMP, scanner.getSnapshotTimestamp());
+                assertTrue(preTs < scanner.getSnapshotTimestamp());
+                syncScanner.close();
+              }
             }
           }
-
-          kuduClient.close();
           return null;
         }
       };
@@ -1127,29 +1126,31 @@ public class TestKuduClient {
     Method methodToInvoke = KuduClient.class.getMethod(clientMethodName);
 
     for (int i = 0; i < 5; i++) {
-      KuduClient cl = new KuduClient.KuduClientBuilder(
-          harness.getMasterAddressesAsString()).build();
-      harness.restartLeaderMaster();
-
-      // There's a good chance that this executes while there's no leader
-      // master. It should retry until the leader election completes and a new
-      // leader master is elected.
-      methodToInvoke.invoke(cl);
+      try (KuduClient cl = new KuduClient.KuduClientBuilder(
+          harness.getMasterAddressesAsString()).build()) {
+        harness.restartLeaderMaster();
+
+        // There's a good chance that this executes while there's no leader
+        // master. It should retry until the leader election completes and a new
+        // leader master is elected.
+        methodToInvoke.invoke(cl);
+      }
     }
 
     // With all masters down, exportAuthenticationCredentials() should time out.
     harness.killAllMasterServers();
-    KuduClient cl = new KuduClient.KuduClientBuilder(
+    try (KuduClient cl = new KuduClient.KuduClientBuilder(
         harness.getMasterAddressesAsString())
-                    .defaultAdminOperationTimeoutMs(5000) // speed up the test
-                    .build();
-    try {
-      methodToInvoke.invoke(cl);
-      fail();
-    } catch (InvocationTargetException ex) {
-      assertTrue(ex.getTargetException() instanceof KuduException);
-      KuduException realEx = (KuduException)ex.getTargetException();
-      assertTrue(realEx.getStatus().isTimedOut());
+         .defaultAdminOperationTimeoutMs(5000) // speed up the test
+         .build()) {
+      try {
+        methodToInvoke.invoke(cl);
+        fail();
+      } catch (InvocationTargetException ex) {
+        assertTrue(ex.getTargetException() instanceof KuduException);
+        KuduException realEx = (KuduException)ex.getTargetException();
+        assertTrue(realEx.getStatus().isTimedOut());
+      }
     }
   }
 
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java
index 197e57c..210fa2a 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java
@@ -95,11 +95,14 @@ public class TestSecurity {
 
   @After
   public void tearDown() throws IOException {
-    if (client != null) {
-      client.close();
-    }
-    if (miniCluster != null) {
-      miniCluster.shutdown();
+    try {
+      if (client != null) {
+        client.close();
+      }
+    } finally {
+      if (miniCluster != null) {
+        miniCluster.shutdown();
+      }
     }
   }
 
@@ -145,9 +148,8 @@ public class TestSecurity {
     assertNotNull(authnData);
     String oldTicketCache = System.getProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY);
     System.clearProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY);
-    try {
-      KuduClient newClient = new KuduClient.KuduClientBuilder(
-          miniCluster.getMasterAddressesAsString()).build();
+    try (KuduClient newClient = new KuduClient.KuduClientBuilder(
+        miniCluster.getMasterAddressesAsString()).build()) {
 
       // Test that a client with no credentials cannot list servers.
       try {
@@ -206,8 +208,7 @@ public class TestSecurity {
 
     String oldTicketCache = System.getProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY);
     System.clearProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY);
-    try {
-      KuduClient newClient = createClient();
+    try (KuduClient newClient = createClient()) {
       newClient.importAuthenticationCredentials(authnData);
 
       // We shouldn't be able to connect because we have no appropriate CA cert.
@@ -240,8 +241,7 @@ public class TestSecurity {
     assertNotNull(authnData);
     String oldTicketCache = System.getProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY);
     System.clearProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY);
-    try {
-      final KuduClient newClient = createClient();
+    try (final KuduClient newClient = createClient()) {
       newClient.importAuthenticationCredentials(authnData);
 
       // Try to connect to all the masters and assert there is no
@@ -283,8 +283,7 @@ public class TestSecurity {
     assertNotNull(authnData);
     String oldTicketCache = System.getProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY);
     System.clearProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY);
-    try {
-      KuduClient newClient = createClient();
+    try (KuduClient newClient = createClient()) {
       newClient.importAuthenticationCredentials(authnData);
       System.err.println("=> imported auth");
 
@@ -384,17 +383,13 @@ public class TestSecurity {
     startCluster(ImmutableSet.of(Option.SHORT_TOKENS_AND_TICKETS));
     Subject subject = SecurityUtil.getSubjectFromTicketCacheOrNull();
     Assert.assertNotNull(subject);
-    try (Closeable c = cla.attach()) {
-      // Create a client attached to our own subject.
-      KuduClient newClient = createClientFromSubject(subject);
+    try (Closeable c = cla.attach();
+         // Create a client attached to our own subject.
+         KuduClient newClient = createClientFromSubject(subject)) {
       // It should not get auto-refreshed.
-      try {
-        assertEventualAuthenticationFailure(newClient,
-            "server requires authentication, but " +
-            "client Kerberos credentials (TGT) have expired");
-      } finally {
-        newClient.close();
-      }
+      assertEventualAuthenticationFailure(newClient,
+          "server requires authentication, but " +
+          "client Kerberos credentials (TGT) have expired");
     }
     Assert.assertThat(cla.getAppendedText(), CoreMatchers.containsString(
         "Using caller-provided subject with Kerberos principal test-admin@KRBTEST.COM."));
@@ -416,9 +411,9 @@ public class TestSecurity {
 
     Subject subject = SecurityUtil.getSubjectFromTicketCacheOrNull();
     Assert.assertNotNull(subject);
-    try (Closeable c = cla.attach()) {
-      // Create a client attached to our own subject.
-      KuduClient newClient = createClientFromSubject(subject);
+    try (Closeable c = cla.attach();
+         // Create a client attached to our own subject.
+         KuduClient newClient = createClientFromSubject(subject)) {
       // Run for longer than the renewable lifetime - this ensures that we
       // are indeed picking up the new credentials.
       for (Stopwatch sw = Stopwatch.createStarted();
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
index db3dcbd..20033ca 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
@@ -41,34 +41,34 @@ public class TestTimeouts {
    */
   @Test(timeout = 100000)
   public void testLowTimeouts() throws Exception {
-    KuduClient lowTimeoutsClient =
-        new KuduClient.KuduClientBuilder(harness.getMasterAddressesAsString())
-        .defaultAdminOperationTimeoutMs(1)
-        .defaultOperationTimeoutMs(1)
-        .build();
+    try (KuduClient lowTimeoutsClient =
+         new KuduClient.KuduClientBuilder(harness.getMasterAddressesAsString())
+         .defaultAdminOperationTimeoutMs(1)
+         .defaultOperationTimeoutMs(1)
+         .build()) {
+      try {
+        lowTimeoutsClient.listTabletServers();
+        fail("Should have timed out");
+      } catch (KuduException ex) {
+        // Expected.
+      }
 
-    try {
-      lowTimeoutsClient.listTabletServers();
-      fail("Should have timed out");
-    } catch (KuduException ex) {
-      // Expected.
-    }
-
-    harness.getClient().createTable(TABLE_NAME, getBasicSchema(), getBasicCreateTableOptions());
-    KuduTable table = lowTimeoutsClient.openTable(TABLE_NAME);
+      harness.getClient().createTable(TABLE_NAME, getBasicSchema(), getBasicCreateTableOptions());
+      KuduTable table = lowTimeoutsClient.openTable(TABLE_NAME);
 
-    KuduSession lowTimeoutSession = lowTimeoutsClient.newSession();
+      KuduSession lowTimeoutSession = lowTimeoutsClient.newSession();
 
-    OperationResponse response = lowTimeoutSession.apply(createBasicSchemaInsert(table, 1));
-    assertTrue(response.hasRowError());
-    assertTrue(response.getRowError().getErrorStatus().isTimedOut());
+      OperationResponse response = lowTimeoutSession.apply(createBasicSchemaInsert(table, 1));
+      assertTrue(response.hasRowError());
+      assertTrue(response.getRowError().getErrorStatus().isTimedOut());
 
-    KuduScanner lowTimeoutScanner = lowTimeoutsClient.newScannerBuilder(table).build();
-    try {
-      lowTimeoutScanner.nextRows();
-      fail("Should have timed out");
-    } catch (KuduException ex) {
-      assertTrue(ex.getStatus().isTimedOut());
+      KuduScanner lowTimeoutScanner = lowTimeoutsClient.newScannerBuilder(table).build();
+      try {
+        lowTimeoutScanner.nextRows();
+        fail("Should have timed out");
+      } catch (KuduException ex) {
+        assertTrue(ex.getStatus().isTimedOut());
+      }
     }
   }
 }
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
index f63f941..969c712 100644
--- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
+++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
@@ -178,13 +178,14 @@ public class KuduSink extends AbstractSink implements Configurable {
       if (client != null) {
         client.shutdown();
       }
-      client = null;
-      table = null;
-      session = null;
     } catch (Exception e) {
       ex = e;
       logger.error("Error closing client", e);
     }
+    client = null;
+    table = null;
+    session = null;
+
     sinkCounter.incrementConnectionClosedCount();
     sinkCounter.stop();
     if (ex != null) {
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index 461886c..a8716f3 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -478,7 +478,10 @@ private object KuduContext {
 }
 
 private object KuduClientCache {
+  val Log: Logger = LoggerFactory.getLogger(KuduClientCache.getClass)
+
   private case class CacheKey(kuduMaster: String, socketReadTimeoutMs: Option[Long])
+  private case class CacheValue(kuduClient: AsyncKuduClient, shutdownHookHandle: Runnable)
 
   /**
    * Set to
@@ -489,10 +492,24 @@ private object KuduClientCache {
    */
   private val ShutdownHookPriority = 100
 
-  private val clientCache = new mutable.HashMap[CacheKey, AsyncKuduClient]()
+  private val clientCache = new mutable.HashMap[CacheKey, CacheValue]()
 
   // Visible for testing.
-  private[kudu] def clearCacheForTests() = clientCache.clear()
+  private[kudu] def clearCacheForTests() = {
+    clientCache.values.foreach {
+      case cacheValue =>
+        try {
+          cacheValue.kuduClient.close()
+        } catch {
+          case e: Exception => Log.warn("Error while shutting down the test client", e);
+        }
+
+        // A client may only be closed once, so once we've close this client,
+        // we mustn't close it again at shutdown time.
+        ShutdownHookManager.get().removeShutdownHook(cacheValue.shutdownHookHandle)
+    }
+    clientCache.clear()
+  }
 
   def getAsyncClient(kuduMaster: String, socketReadTimeoutMs: Option[Long]): AsyncKuduClient = {
     val cacheKey = CacheKey(kuduMaster, socketReadTimeoutMs)
@@ -505,14 +522,14 @@ private object KuduClientCache {
         }
 
         val asyncClient = builder.build()
-        ShutdownHookManager
-          .get()
-          .addShutdownHook(new Runnable {
-            override def run(): Unit = asyncClient.close()
-          }, ShutdownHookPriority)
-        clientCache.put(cacheKey, asyncClient)
+        val hookHandle = new Runnable {
+          override def run(): Unit = asyncClient.close()
+        }
+        ShutdownHookManager.get().addShutdownHook(hookHandle, ShutdownHookPriority)
+        val cacheValue = CacheValue(asyncClient, hookHandle)
+        clientCache.put(cacheKey, cacheValue)
       }
-      return clientCache(cacheKey)
+      return clientCache(cacheKey).kuduClient
     }
   }
 }
diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java
index a462853..d4e008a 100644
--- a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java
+++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java
@@ -155,7 +155,7 @@ public class KuduTestHarness extends ExternalResource {
         // shutting down the sync client effectively does that.
       }
     } catch (KuduException e) {
-      LOG.warn("Error while shutting down the test client");
+      LOG.warn("Error while shutting down the test client", e);
     } finally {
       if (miniCluster != null) {
         miniCluster.shutdown();
diff --git a/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java b/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java
index 95a3843..1722a57 100644
--- a/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java
+++ b/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java
@@ -89,8 +89,8 @@ public class TestMiniKuduCluster {
                                                       .numMasterServers(NUM_MASTERS)
                                                       .numTabletServers(NUM_TABLET_SERVERS)
                                                       .enableKerberos()
-                                                      .build()) {
-      KuduClient client = new KuduClientBuilder(cluster.getMasterAddressesAsString()).build();
+                                                      .build();
+         KuduClient client = new KuduClientBuilder(cluster.getMasterAddressesAsString()).build()) {
       ListTablesResponse resp = client.getTablesList();
       assertTrue(resp.getTablesList().isEmpty());
       assertNull(client.getHiveMetastoreConfig());
@@ -103,8 +103,8 @@ public class TestMiniKuduCluster {
                                                       .numMasterServers(NUM_MASTERS)
                                                       .numTabletServers(NUM_TABLET_SERVERS)
                                                       .enableHiveMetastoreIntegration()
-                                                      .build()) {
-      KuduClient client = new KuduClientBuilder(cluster.getMasterAddressesAsString()).build();
+                                                      .build();
+         KuduClient client = new KuduClientBuilder(cluster.getMasterAddressesAsString()).build()) {
       assertNotNull(client.getHiveMetastoreConfig());
     }
   }


[kudu] 01/02: [java] fix ITClient test retrying

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 0b80b0abae99c56db20e96249981a48886c56d33
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Wed Jan 30 17:09:56 2019 -0800

    [java] fix ITClient test retrying
    
    ITClient is flaky due to KUDU-2390, but it's also unable to retry upon
    failure. I think this is caused by the static error latch which isn't reset
    in between test runs. There's no reason for it to be static; I'm guessing
    it's a holdover from when the minicluster state was static.
    
    I also added an @After to close the client instance used by the test, which
    will hopefully reduce the amount of log pollution in the retries.
    
    Change-Id: I4a9f6a27541de02bbc8cf9dc3ba72824a6324a08
    Reviewed-on: http://gerrit.cloudera.org:8080/12321
    Tested-by: Kudu Jenkins
    Reviewed-by: Grant Henke <gr...@apache.org>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 .../test/java/org/apache/kudu/client/ITClient.java | 57 +++++++++++++---------
 1 file changed, 34 insertions(+), 23 deletions(-)

diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
index 06f25f8..e28c1f2 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.kudu.test.KuduTestHarness;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -57,15 +58,16 @@ public class ITClient {
 
   private static final String TABLE_NAME =
       ITClient.class.getName() + "-" + System.currentTimeMillis();
+
   // One error and we stop the test.
-  private static final CountDownLatch KEEP_RUNNING_LATCH = new CountDownLatch(1);
+  private final CountDownLatch keepRunningLatch = new CountDownLatch(1);
   // Latch used to track if an error occurred and we need to stop the test early.
-  private static final CountDownLatch ERROR_LATCH = new CountDownLatch(1);
+  private final CountDownLatch errorLatch = new CountDownLatch(1);
 
-  private static KuduClient localClient;
-  private static AsyncKuduClient localAsyncClient;
-  private static KuduTable table;
-  private static long runtimeInSeconds;
+  private KuduClient localClient;
+  private AsyncKuduClient localAsyncClient;
+  private KuduTable table;
+  private long runtimeInSeconds;
 
   private volatile long sharedWriteTimestamp;
 
@@ -97,6 +99,15 @@ public class ITClient {
     table = localClient.createTable(TABLE_NAME, getBasicSchema(), builder);
   }
 
+  @After
+  public void tearDown() throws Exception {
+    if (localClient != null) {
+      localClient.shutdown();
+      // No need to explicitly shutdown the async client,
+      // shutting down the sync client effectively does that.
+    }
+  }
+
   @Test(timeout = TEST_TIMEOUT_SECONDS)
   public void test() throws Exception {
 
@@ -117,11 +128,11 @@ public class ITClient {
     }
 
     // await() returns yes if the latch reaches 0, we don't want that.
-    Assert.assertFalse("Look for the last ERROR line in the log that comes from ITCLient",
-        ERROR_LATCH.await(runtimeInSeconds, TimeUnit.SECONDS));
+    Assert.assertFalse("Look for the last ERROR line in the log that comes from ITClient",
+        errorLatch.await(runtimeInSeconds, TimeUnit.SECONDS));
 
     // Indicate we want to stop, then wait a little bit for it to happen.
-    KEEP_RUNNING_LATCH.countDown();
+    keepRunningLatch.countDown();
 
     for (Thread thread : threads) {
       // Give plenty of time for threads to stop.
@@ -140,7 +151,7 @@ public class ITClient {
    */
   private void reportError(String message, Exception exception) {
     LOG.error(message, exception);
-    ERROR_LATCH.countDown();
+    errorLatch.countDown();
   }
 
   /**
@@ -153,11 +164,11 @@ public class ITClient {
     @Override
     public void run() {
       try {
-        KEEP_RUNNING_LATCH.await(2, TimeUnit.SECONDS);
+        keepRunningLatch.await(2, TimeUnit.SECONDS);
       } catch (InterruptedException e) {
         return;
       }
-      while (KEEP_RUNNING_LATCH.getCount() > 0) {
+      while (keepRunningLatch.getCount() > 0) {
         try {
           boolean shouldContinue;
           int randomInt = random.nextInt(3);
@@ -172,7 +183,7 @@ public class ITClient {
           if (!shouldContinue) {
             return;
           }
-          KEEP_RUNNING_LATCH.await(5, TimeUnit.SECONDS);
+          keepRunningLatch.await(5, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
           return;
         }
@@ -195,7 +206,7 @@ public class ITClient {
         connections.get(random.nextInt(connections.size())).disconnect();
 
       } catch (Exception e) {
-        if (KEEP_RUNNING_LATCH.getCount() == 0) {
+        if (keepRunningLatch.getCount() == 0) {
           // Likely shutdown() related.
           return false;
         }
@@ -248,7 +259,7 @@ public class ITClient {
     @Override
     public void run() {
       session.setExternalConsistencyMode(ExternalConsistencyMode.CLIENT_PROPAGATED);
-      while (KEEP_RUNNING_LATCH.getCount() > 0) {
+      while (keepRunningLatch.getCount() > 0) {
         try {
           OperationResponse resp = session.apply(createBasicSchemaInsert(table, currentRowKey));
           if (hasRowErrorAndReport(resp)) {
@@ -276,7 +287,7 @@ public class ITClient {
             }
           }
         } catch (Exception e) {
-          if (KEEP_RUNNING_LATCH.getCount() == 0) {
+          if (keepRunningLatch.getCount() == 0) {
             // Likely shutdown() related.
             return;
           }
@@ -315,7 +326,7 @@ public class ITClient {
 
     @Override
     public void run() {
-      while (KEEP_RUNNING_LATCH.getCount() > 0) {
+      while (keepRunningLatch.getCount() > 0) {
 
         boolean shouldContinue;
 
@@ -335,7 +346,7 @@ public class ITClient {
 
         if (lastRowCount == 0) {
           try {
-            KEEP_RUNNING_LATCH.await(50, TimeUnit.MILLISECONDS);
+            keepRunningLatch.await(50, TimeUnit.MILLISECONDS);
           } catch (InterruptedException e) {
             // Test is stopping.
             return;
@@ -390,7 +401,7 @@ public class ITClient {
       DeadlineTracker deadlineTracker = new DeadlineTracker();
       deadlineTracker.setDeadline(DEFAULT_SLEEP);
 
-      while (KEEP_RUNNING_LATCH.getCount() > 0 && !deadlineTracker.timedOut()) {
+      while (keepRunningLatch.getCount() > 0 && !deadlineTracker.timedOut()) {
         KuduScanner scanner = getScannerBuilder().build();
 
         try {
@@ -406,13 +417,13 @@ public class ITClient {
           }
           return true;
         } else {
-          reportError("Row count unexpectedly decreased from " + lastRowCount + "to " + rowCount,
+          reportError("Row count unexpectedly decreased from " + lastRowCount + " to " + rowCount,
               null);
         }
 
         // Due to the lack of KUDU-430, we need to loop for a while.
         try {
-          KEEP_RUNNING_LATCH.await(50, TimeUnit.MILLISECONDS);
+          keepRunningLatch.await(50, TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
           // No need to do anything, we'll exit the loop once we test getCount() in the condition.
         }
@@ -462,9 +473,9 @@ public class ITClient {
     @Override
     public void uncaughtException(Thread t, Throwable e) {
       // Only report an error if we're still running, else we'll spam the log.
-      if (KEEP_RUNNING_LATCH.getCount() != 0) {
+      if (keepRunningLatch.getCount() != 0) {
         reportError("Uncaught exception", new Exception(e));
       }
     }
   }
-}
\ No newline at end of file
+}