You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2021/04/09 23:06:34 UTC

[kudu] branch master updated: KUDU-2612: Java client failover scenarios for TxnManager

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a32542  KUDU-2612: Java client failover scenarios for TxnManager
0a32542 is described below

commit 0a32542ca5669c842e91b1b736d108a7ae84ec5c
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Sun Mar 7 21:42:26 2021 -0800

    KUDU-2612: Java client failover scenarios for TxnManager
    
    Added a couple of test scenarios to verify that Java client
    automatically switches to other available TxnManager for performing
    txn-related operations.  Sending txn keep-alive messages isn't covered
    yet: I'm planning to address these separately since some extra changes
    are needed there which would be easier to review in a separate patch.
    
    I also updated the inline doc for KuduTransaction.isCommitComplete()
    to be more explicit about possible exceptions thrown.
    
    Change-Id: I9c7d73ce4a74d426286facbf02dff6e48b46a7c0
    Reviewed-on: http://gerrit.cloudera.org:8080/17297
    Tested-by: Alexey Serbin <as...@cloudera.com>
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 .../org/apache/kudu/client/KuduTransaction.java    |   6 +
 .../main/java/org/apache/kudu/client/RpcProxy.java |   2 +-
 .../apache/kudu/client/TestKuduTransaction.java    | 174 ++++++++++++++++++++-
 3 files changed, 177 insertions(+), 5 deletions(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
index 2e7bf41..ed61497 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
@@ -301,6 +301,12 @@ public class KuduTransaction implements AutoCloseable {
    * Check whether the commit phase for a transaction is complete.
    *
    * @return {@code true} if transaction has finalized, otherwise {@code false}
+   * @throws NonRecoverableException with Status.Aborted()
+   *   if transaction has been or is being aborted
+   * @throws NonRecoverableException with Status.IllegalState()
+   *   if transaction is still open (i.e. commit() hasn't been called yet)
+   * @throws NonRecoverableException with Status.NotSupported()
+   *   if transaction is in unexpected state (non-compatible backend?)
    * @throws KuduException if an error happens while querying the system about
    *                       the state of the transaction
    */
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
index 28784ab..6c4350f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
@@ -442,7 +442,7 @@ class RpcProxy {
 
     // TODO(aserbin): try sending request to other TxnManager instance,
     //                if possible. The idea is that Kudu clusters are expected
-    //                expected to have multiple masters, so if one TxnManager
+    //                to have multiple masters, so if one TxnManager
     //                instance is not available, there is a high chance that
     //                others are still available (TxnManager is hosted by a
     //                kudu-master process).
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
index 5b8b035..5c97e54 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
@@ -17,6 +17,7 @@
 
 package org.apache.kudu.client;
 
+import static org.apache.kudu.test.ClientTestUtil.createBasicSchemaInsert;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -28,6 +29,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import com.google.common.collect.ImmutableList;
 import com.google.protobuf.CodedInputStream;
 import com.google.protobuf.CodedOutputStream;
 import org.junit.Before;
@@ -35,6 +37,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.function.ThrowingRunnable;
 
+import org.apache.kudu.test.ClientTestUtil;
 import org.apache.kudu.test.KuduTestHarness;
 import org.apache.kudu.test.KuduTestHarness.MasterServerConfig;
 import org.apache.kudu.test.KuduTestHarness.TabletServerConfig;
@@ -446,7 +449,7 @@ public class TestKuduTransaction {
    */
   @Test(timeout = 100000)
   @MasterServerConfig(flags = {
-      "--txn_manager_enabled=true",
+      "--txn_manager_enabled",
   })
   @TabletServerConfig(flags = {
       "--txn_schedule_background_tasks=false"
@@ -497,7 +500,7 @@ public class TestKuduTransaction {
    */
   @Test(timeout = 100000)
   @MasterServerConfig(flags = {
-      "--txn_manager_enabled=true",
+      "--txn_manager_enabled",
   })
   public void testSerializationOptions() throws Exception {
     final KuduTransaction txn = client.newTransaction();
@@ -560,7 +563,7 @@ public class TestKuduTransaction {
    */
   @Test(timeout = 100000)
   @MasterServerConfig(flags = {
-      "--txn_manager_enabled=true",
+      "--txn_manager_enabled",
   })
   @TabletServerConfig(flags = {
       "--txn_keepalive_interval_ms=200",
@@ -628,7 +631,7 @@ public class TestKuduTransaction {
    */
   @Test(timeout = 100000)
   @MasterServerConfig(flags = {
-      "--txn_manager_enabled=true",
+      "--txn_manager_enabled",
   })
   @TabletServerConfig(flags = {
       "--txn_keepalive_interval_ms=200",
@@ -713,4 +716,167 @@ public class TestKuduTransaction {
       txn.isCommitComplete();
     }
   }
+
+  /**
+   * Test to verify that Kudu client is able to switch to TxnManager hosted by
+   * other kudu-master process when the previously used one isn't available.
+   */
+  @Test(timeout = 100000)
+  @MasterServerConfig(flags = {
+      // TxnManager functionality is necessary for this scenario.
+      "--txn_manager_enabled",
+
+      // Set Raft heartbeat interval short for faster test runtime: speed up
+      // leader failure detection and new leader election.
+      "--raft_heartbeat_interval_ms=100",
+  })
+  public void testSwitchToOtherTxnManager() throws Exception {
+    final String TABLE_NAME = "txn_manager_ops_fallback";
+    client.createTable(
+        TABLE_NAME,
+        ClientTestUtil.getBasicSchema(),
+        new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 2));
+
+    // Start a transaction, then restart every available TxnManager instance
+    // before attempting any txn-related operation.
+    {
+      KuduTransaction txn = client.newTransaction();
+      KuduSession session = txn.newKuduSession();
+
+      KuduTable table = client.openTable(TABLE_NAME);
+
+      Insert insert = createBasicSchemaInsert(table, 0);
+      session.apply(insert);
+      session.flush();
+
+      harness.killAllMasterServers();
+      harness.startAllMasterServers();
+
+      // Querying the status of a transaction should be possible, as usual.
+      // Since the transaction is still open, KuduTransaction.isCommitComplete()
+      // should throw corresponding exception with Status.IllegalState.
+      try {
+        txn.isCommitComplete();
+        fail("KuduTransaction.isCommitComplete should have thrown");
+      } catch (NonRecoverableException e) {
+        assertTrue(e.getStatus().toString(), e.getStatus().isIllegalState());
+        assertEquals("transaction is still open", e.getMessage());
+      }
+
+      harness.killAllMasterServers();
+      harness.startAllMasterServers();
+
+      // It should be possible to commit the transaction.
+      txn.commit(true /*wait*/);
+
+      // An extra sanity check: read back the row written into the table in the
+      // context of the transaction.
+      KuduScanner scanner = new KuduScanner.KuduScannerBuilder(asyncClient, table)
+          .replicaSelection(ReplicaSelection.LEADER_ONLY)
+          .build();
+
+      assertEquals(1, scanner.nextRows().getNumRows());
+    }
+
+    // Similar to the above, but run KuduTransaction.commit() when only 2 out
+    // of 3 masters are running while the TxnManager which used to start the
+    // transaction is no longer around.
+    {
+      KuduTransaction txn = client.newTransaction();
+      KuduSession session = txn.newKuduSession();
+
+      KuduTable table = client.openTable(TABLE_NAME);
+
+      Insert insert = createBasicSchemaInsert(table, 1);
+      session.apply(insert);
+      session.flush();
+
+      harness.killLeaderMasterServer();
+
+      // It should be possible to commit the transaction: 2 out of 3 masters are
+      // running and Raft should be able to establish a leader master. So,
+      // txn-related operations routed through TxnManager should succeed.
+      txn.commit(true /*wait*/);
+
+      // An extra sanity check: read back the row written into the table in the
+      // context of the transaction.
+      KuduScanner scanner = new KuduScanner.KuduScannerBuilder(asyncClient, table)
+          .replicaSelection(ReplicaSelection.LEADER_ONLY)
+          .build();
+
+      // It's an empty transaction, and 1 row should be there from the prior
+      // sub-scenario.
+      assertEquals(1, scanner.nextRows().getNumRows());
+    }
+  }
+
+  /**
+   * Test to verify that Kudu client is able to switch to TxnManager hosted by
+   * other kudu-master process when the previously used one isn't available,
+   * even if txn-related calls first are issued when no TxnManager was running.
+   */
+  @Test(timeout = 100000)
+  @MasterServerConfig(flags = {
+      // TxnManager functionality is necessary for this scenario.
+      "--txn_manager_enabled",
+
+      // Set Raft heartbeat interval short for faster test runtime: speed up
+      // leader failure detection and new leader election.
+      "--raft_heartbeat_interval_ms=100",
+  })
+  public void testSwitchToOtherTxnManagerInFlightCalls() throws Exception {
+    final String TABLE_NAME = "txn_manager_ops_fallback_inflight";
+    client.createTable(
+        TABLE_NAME,
+        ClientTestUtil.getBasicSchema(),
+        new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 2));
+
+    KuduTransaction txn = client.newTransaction();
+    KuduSession session = txn.newKuduSession();
+
+    KuduTable table = client.openTable(TABLE_NAME);
+
+    Insert insert = createBasicSchemaInsert(table, 0);
+    session.apply(insert);
+    session.flush();
+
+    harness.killAllMasterServers();
+
+    Thread t = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          // Sleep for some time to allow the KuduTransaction.commit() call
+          // below issue RPCs to non-running TxnManangers.
+          Thread.sleep(1000);
+          harness.startAllMasterServers();
+        } catch (Exception e) {
+          fail("failed to start all masters: " + e);
+        }
+      }
+    });
+    t.start();
+
+    // It should be possible to commit the transaction.
+    txn.commit(true /*wait*/);
+
+    // Just an extra sanity check: the thread should join pretty fast, otherwise
+    // the call to KuduTransaction.commit() above could not succeed.
+    t.join(250);
+
+    // An extra sanity check: read back the row written into the table in the
+    // context of the transaction.
+    KuduScanner scanner = new KuduScanner.KuduScannerBuilder(asyncClient, table)
+        .replicaSelection(ReplicaSelection.LEADER_ONLY)
+        .build();
+
+    assertEquals(1, scanner.nextRows().getNumRows());
+  }
+
+  // TODO(aserbin): when test harness allows for sending Kudu servers particular
+  //                signals, add a test scenario to verify that timeout for
+  //                TxnManager request is set low enough to detect 'frozen'
+  //                TxnManager instance (e.g., sent SIGSTOP signal), and is able
+  //                to switch to another TxnManager to send txn keepalive
+  //                requests fast enough to keep the transaction alive.
 }