You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2017/05/03 19:25:45 UTC

kudu git commit: [java client] improve flaky test ITClient

Repository: kudu
Updated Branches:
  refs/heads/master 10ab56ffe -> ca7fffec9


[java client] improve flaky test ITClient

This changes ITClient test to use fault tolerant scanner. And
fixes client side disconnection error handling for both fault
tolerant scanner and non fault tolerant one, through keeping
track of tablet server it is talking to for each ScanRequest.
It also adds more test coverage for multi-tablet scan using
client side fault injection.

Change-Id: I81946e951db69d6c75c6a2e22a802cc4f4c447b9
Reviewed-on: http://gerrit.cloudera.org:8080/6776
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/ca7fffec
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/ca7fffec
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/ca7fffec

Branch: refs/heads/master
Commit: ca7fffec94bd1e81dcc9ee819b3658fc7993ea8e
Parents: 10ab56f
Author: hahao <ha...@cloudera.com>
Authored: Mon May 1 16:31:02 2017 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Wed May 3 19:25:24 2017 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/AsyncKuduClient.java |  19 ++-
 .../apache/kudu/client/AsyncKuduScanner.java    |   9 +-
 .../java/org/apache/kudu/client/ITClient.java   |   3 +-
 .../kudu/client/ITFaultTolerantScanner.java     |  26 +++-
 .../kudu/client/ITNonFaultTolerantScanner.java  |  39 +-----
 .../kudu/client/ITScannerMultiTablet.java       | 132 +++++++++++++------
 6 files changed, 148 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/ca7fffec/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 9ce0536..fbc181a 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -726,14 +726,29 @@ public class AsyncKuduClient implements AutoCloseable {
    * @param replicaSelection replica selection mechanism to use
    */
   @VisibleForTesting
-  void closeCurrentConnection(RemoteTablet tablet,
-        ReplicaSelection replicaSelection) {
+  void shutdownConnection(RemoteTablet tablet,
+                          ReplicaSelection replicaSelection) {
     TabletClient client = connectionCache.getClient(
         tablet.getReplicaSelectedUUID(replicaSelection));
     client.shutdown();
   }
 
   /**
+   * Forcefully disconnects the RemoteTablet connection and
+   * fails all outstanding RPCs.
+   *
+   * @param tablet the given tablet
+   * @param replicaSelection replica selection mechanism to use
+   */
+  @VisibleForTesting
+  void disconnect(RemoteTablet tablet,
+                  ReplicaSelection replicaSelection) {
+    TabletClient client = connectionCache.getClient(
+        tablet.getReplicaSelectedUUID(replicaSelection));
+    client.disconnect();
+  }
+
+  /**
    * Sends the provided {@link KuduRpc} to the tablet server hosting the leader
    * of the tablet identified by the RPC's table and partition key.
    *

http://git-wip-us.apache.org/repos/asf/kudu/blob/ca7fffec/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index d471830..8081e6f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -639,21 +639,21 @@ public final class AsyncKuduScanner {
    */
   KuduRpc<Response> getOpenRequest() {
     checkScanningNotStarted();
-    return new ScanRequest(table, State.OPENING);
+    return new ScanRequest(table, State.OPENING, tablet);
   }
 
   /**
    * Returns an RPC to fetch the next rows.
    */
   KuduRpc<Response> getNextRowsRequest() {
-    return new ScanRequest(table, State.NEXT);
+    return new ScanRequest(table, State.NEXT, tablet);
   }
 
   /**
    * Returns an RPC to close this scanner.
    */
   KuduRpc<Response> getCloseRequest() {
-    return new ScanRequest(table, State.CLOSING);
+    return new ScanRequest(table, State.CLOSING, tablet);
   }
 
   /**
@@ -740,8 +740,9 @@ public final class AsyncKuduScanner {
 
     State state;
 
-    ScanRequest(KuduTable table, State state) {
+    ScanRequest(KuduTable table, State state, RemoteTablet tablet) {
       super(table);
+      setTablet(tablet);
       this.state = state;
       this.setTimeoutMillis(scanRequestTimeout);
     }

http://git-wip-us.apache.org/repos/asf/kudu/blob/ca7fffec/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
index d542f1e..0eaefa5 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
@@ -413,7 +413,8 @@ public class ITClient extends BaseKuduTest {
     private KuduScanner.KuduScannerBuilder getScannerBuilder() {
       return localClient.newScannerBuilder(table)
           .readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT)
-          .snapshotTimestampRaw(sharedWriteTimestamp);
+          .snapshotTimestampRaw(sharedWriteTimestamp)
+          .setFaultTolerant(true);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kudu/blob/ca7fffec/java/kudu-client/src/test/java/org/apache/kudu/client/ITFaultTolerantScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/ITFaultTolerantScanner.java b/java/kudu-client/src/test/java/org/apache/kudu/client/ITFaultTolerantScanner.java
index b2941a3..1e243a6 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITFaultTolerantScanner.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITFaultTolerantScanner.java
@@ -25,12 +25,30 @@ import org.junit.Test;
  */
 public class ITFaultTolerantScanner extends ITScannerMultiTablet {
   /**
+   * Verifies for fault tolerant scanner, it can proceed
+   * properly even if shuts down client connection.
+   */
+  @Test(timeout = 100000)
+  public void testFaultTolerantShutDown() throws KuduException {
+    clientFaultInjection(true, true);
+  }
+
+  /**
+   * Verifies for fault tolerant scanner, it can proceed
+   * properly even if disconnects client connection.
+   */
+  @Test(timeout = 100000)
+  public void testFaultTolerantDisconnect() throws KuduException {
+    clientFaultInjection(false, true);
+  }
+
+  /**
    * Tests fault tolerant scanner by restarting the tablet server in the middle
    * of tablet scanning and verifies the scan results are as expected.
    */
   @Test(timeout = 100000)
   public void testFaultTolerantScannerRestart() throws Exception {
-    faultInjectionScanner(true, true, false);
+    serverFaultInjection(true, true, false);
   }
 
   /**
@@ -39,7 +57,7 @@ public class ITFaultTolerantScanner extends ITScannerMultiTablet {
    */
   @Test(timeout = 100000)
   public void testFaultTolerantScannerKill() throws Exception {
-    faultInjectionScanner(false, true, false);
+    serverFaultInjection(false, true, false);
   }
 
   /**
@@ -48,7 +66,7 @@ public class ITFaultTolerantScanner extends ITScannerMultiTablet {
    */
   @Test(timeout = 100000)
   public void testFaultTolerantScannerKillFinishFirstTablet() throws Exception {
-    faultInjectionScanner(false, true, true);
+    serverFaultInjection(false, true, true);
   }
 
   /**
@@ -57,6 +75,6 @@ public class ITFaultTolerantScanner extends ITScannerMultiTablet {
    */
   @Test(timeout = 100000)
   public void testFaultTolerantScannerRestartFinishFirstTablet() throws Exception {
-    faultInjectionScanner(true, true, true);
+    serverFaultInjection(true, true, true);
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/ca7fffec/java/kudu-client/src/test/java/org/apache/kudu/client/ITNonFaultTolerantScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/ITNonFaultTolerantScanner.java b/java/kudu-client/src/test/java/org/apache/kudu/client/ITNonFaultTolerantScanner.java
index 21ab0ad..79a49c9 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITNonFaultTolerantScanner.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITNonFaultTolerantScanner.java
@@ -34,8 +34,8 @@ public class ITNonFaultTolerantScanner extends ITScannerMultiTablet {
   @Test(timeout = 100000)
   public void testKudu1343() throws Exception {
     KuduScanner scanner = syncClient.newScannerBuilder(table)
-    .batchSizeBytes(1) // Just a hint, won't actually be that small
-    .build();
+        .batchSizeBytes(1) // Just a hint, won't actually be that small
+        .build();
 
     int rowCount = 0;
     int loopCount = 0;
@@ -51,36 +51,11 @@ public class ITNonFaultTolerantScanner extends ITScannerMultiTablet {
 
   /**
    * Verifies for non fault tolerant scanner, it can proceed
-   * properly even if there is a disconnection.
+   * properly even if shuts down client connection.
    */
   @Test(timeout = 100000)
-  public void testNonFaultTolerantDisconnect() throws KuduException {
-    KuduScanner scanner = syncClient.newScannerBuilder(table)
-    .batchSizeBytes(1)
-    .build();
-
-    int rowCount = 0;
-    int loopCount = 0;
-    if (scanner.hasMoreRows()) {
-      loopCount++;
-      RowResultIterator rri = scanner.nextRows();
-      rowCount += rri.getNumRows();
-    }
-
-    // Forcefully shuts down the current connection and
-    // fails all outstanding RPCs in the middle of
-    // scanning.
-    client.closeCurrentConnection(scanner.currentTablet(),
-    scanner.getReplicaSelection());
-
-    while (scanner.hasMoreRows()) {
-      loopCount++;
-      RowResultIterator rri = scanner.nextRows();
-      rowCount += rri.getNumRows();
-    }
-
-    assertTrue(loopCount > TABLET_COUNT);
-    assertEquals(ROW_COUNT, rowCount);
+  public void testNonFaultTolerantShutDown() throws KuduException {
+    clientFaultInjection(true, false);
   }
 
   /**
@@ -89,7 +64,7 @@ public class ITNonFaultTolerantScanner extends ITScannerMultiTablet {
    */
   @Test(timeout = 100000, expected=NonRecoverableException.class)
   public void testNonFaultTolerantScannerKill() throws Exception {
-    faultInjectionScanner(false, false, false);
+    serverFaultInjection(false, false, false);
   }
 
   /**
@@ -98,6 +73,6 @@ public class ITNonFaultTolerantScanner extends ITScannerMultiTablet {
    */
   @Test(timeout = 100000, expected=NonRecoverableException.class)
   public void testNonFaultTolerantScannerRestart() throws Exception {
-    faultInjectionScanner(true, false, false);
+    serverFaultInjection(true, false, false);
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/ca7fffec/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
index 78b9721..c9b930e 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
@@ -17,10 +17,12 @@
 package org.apache.kudu.client;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.util.Random;
 
 import com.google.common.collect.Lists;
+import org.junit.After;
 import org.junit.BeforeClass;
 
 import org.apache.kudu.Schema;
@@ -72,6 +74,11 @@ public class ITScannerMultiTablet extends BaseKuduTest {
     assertEquals(0, session.countPendingErrors());
   }
 
+  @After
+  public void tearDown() throws Exception {
+    restartTabletServers();
+  }
+
   /**
    * Injecting failures (kill or restart TabletServer) while scanning, to verify:
    * fault tolerant scanner will continue scan and non-fault tolerant scanner will throw
@@ -81,65 +88,116 @@ public class ITScannerMultiTablet extends BaseKuduTest {
    * we get rows in order from 3 tablets. We detect those tablet boundaries when keys suddenly
    * become smaller than what was previously seen.
    *
-   * @param shouldRestart if true restarts TabletServer, otherwise kills TabletServer
+   * @param restart if true restarts TabletServer, otherwise kills TabletServer
    * @param isFaultTolerant if true uses fault tolerant scanner, otherwise
    *                        uses non fault-tolerant one
    * @param finishFirstScan if true injects failure before finishing first tablet scan,
    *                        otherwise in the middle of tablet scanning
    * @throws Exception
    */
-  void faultInjectionScanner(boolean shouldRestart, boolean isFaultTolerant,
+  void serverFaultInjection(boolean restart, boolean isFaultTolerant,
       boolean finishFirstScan) throws Exception {
     KuduScanner scanner = syncClient.newScannerBuilder(table)
         .setFaultTolerant(isFaultTolerant)
         .batchSizeBytes(1)
         .setProjectedColumnIndexes(Lists.newArrayList(0)).build();
 
-    int rowCount = 0;
-    int previousRow = -1;
-    int tableBoundariesCount = 0;
-    if (scanner.hasMoreRows()) {
-      RowResultIterator rri = scanner.nextRows();
-      while (rri.hasNext()) {
-        int key = rri.next().getInt(0);
-        if (key < previousRow) {
-          tableBoundariesCount++;
+    try {
+      int rowCount = 0;
+      int previousRow = -1;
+      int tableBoundariesCount = 0;
+      if (scanner.hasMoreRows()) {
+        RowResultIterator rri = scanner.nextRows();
+        while (rri.hasNext()) {
+          int key = rri.next().getInt(0);
+          if (key < previousRow) {
+            tableBoundariesCount++;
+          }
+          previousRow = key;
+          rowCount++;
         }
-        previousRow = key;
-        rowCount++;
       }
-    }
 
-    if (!finishFirstScan) {
-      if (shouldRestart) {
-        restartTabletServer(scanner.currentTablet());
-      } else {
-        killTabletLeader(scanner.currentTablet());
+      if (!finishFirstScan) {
+        if (restart) {
+          restartTabletServer(scanner.currentTablet());
+        } else {
+          killTabletLeader(scanner.currentTablet());
+        }
       }
-    }
 
-    boolean failureInjected = false;
-    while (scanner.hasMoreRows()) {
-      RowResultIterator rri = scanner.nextRows();
-      while (rri.hasNext()) {
-        int key = rri.next().getInt(0);
-        if (key < previousRow) {
-          tableBoundariesCount++;
-          if (finishFirstScan && !failureInjected) {
-            if (shouldRestart) {
-              restartTabletServer(scanner.currentTablet());
-            } else {
-              killTabletLeader(scanner.currentTablet());
+      boolean failureInjected = false;
+      while (scanner.hasMoreRows()) {
+        RowResultIterator rri = scanner.nextRows();
+        while (rri.hasNext()) {
+          int key = rri.next().getInt(0);
+          if (key < previousRow) {
+            tableBoundariesCount++;
+            if (finishFirstScan && !failureInjected) {
+              if (restart) {
+                restartTabletServer(scanner.currentTablet());
+              } else {
+                killTabletLeader(scanner.currentTablet());
+              }
+              failureInjected = true;
             }
-            failureInjected = true;
           }
+          previousRow = key;
+          rowCount++;
         }
-        previousRow = key;
-        rowCount++;
       }
+
+      assertEquals(ROW_COUNT, rowCount);
+      assertEquals(TABLET_COUNT, tableBoundariesCount);
+    } finally {
+      scanner.close();
     }
+  }
+
+  /**
+   * Injecting failures (disconnect or shutdown client connection) while scanning, to verify:
+   * both non-fault tolerant scanner and fault tolerant scanner will continue scan as expected.
+   *
+   * @param shutDown if true shutdown client connection, otherwise disconnect
+   * @param isFaultTolerant if true uses fault tolerant scanner, otherwise
+   *                        uses non fault-tolerant one
+   * @throws Exception
+   */
+  void clientFaultInjection(boolean shutDown, boolean isFaultTolerant) throws KuduException {
+    KuduScanner scanner = syncClient.newScannerBuilder(table)
+        .setFaultTolerant(isFaultTolerant)
+        .batchSizeBytes(1)
+        .build();
+
+    try {
+      int rowCount = 0;
+      int loopCount = 0;
+      if (scanner.hasMoreRows()) {
+        loopCount++;
+        RowResultIterator rri = scanner.nextRows();
+        rowCount += rri.getNumRows();
+      }
 
-    assertEquals(ROW_COUNT, rowCount);
-    assertEquals(TABLET_COUNT, tableBoundariesCount);
+      // Forcefully shutdowns/disconnects the current connection and
+      // fails all outstanding RPCs in the middle of scanning.
+      if (shutDown) {
+        client.shutdownConnection(scanner.currentTablet(),
+                scanner.getReplicaSelection());
+      } else {
+        client.disconnect(scanner.currentTablet(),
+                scanner.getReplicaSelection());
+      }
+
+      while (scanner.hasMoreRows()) {
+        loopCount++;
+        RowResultIterator rri = scanner.nextRows();
+        rowCount += rri.getNumRows();
+      }
+
+      assertTrue(loopCount > TABLET_COUNT);
+      assertEquals(ROW_COUNT, rowCount);
+    } finally {
+      scanner.close();
+    }
   }
 }