You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2021/04/09 23:06:34 UTC
[kudu] branch master updated: KUDU-2612: Java client failover
scenarios for TxnManager
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 0a32542 KUDU-2612: Java client failover scenarios for TxnManager
0a32542 is described below
commit 0a32542ca5669c842e91b1b736d108a7ae84ec5c
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Sun Mar 7 21:42:26 2021 -0800
KUDU-2612: Java client failover scenarios for TxnManager
Added a couple of test scenarios to verify that Java client
automatically switches to other available TxnManager for performing
txn-related operations. Sending txn keep-alive messages isn't covered
yet: I'm planning to address these separately since some extra changes
are needed there which would be easier to review in a separate patch.
I also updated the inline doc for KuduTransaction.isCommitComplete()
to be more explicit about possible exceptions thrown.
Change-Id: I9c7d73ce4a74d426286facbf02dff6e48b46a7c0
Reviewed-on: http://gerrit.cloudera.org:8080/17297
Tested-by: Alexey Serbin <as...@cloudera.com>
Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
.../org/apache/kudu/client/KuduTransaction.java | 6 +
.../main/java/org/apache/kudu/client/RpcProxy.java | 2 +-
.../apache/kudu/client/TestKuduTransaction.java | 174 ++++++++++++++++++++-
3 files changed, 177 insertions(+), 5 deletions(-)
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
index 2e7bf41..ed61497 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
@@ -301,6 +301,12 @@ public class KuduTransaction implements AutoCloseable {
* Check whether the commit phase for a transaction is complete.
*
* @return {@code true} if transaction has finalized, otherwise {@code false}
+ * @throws NonRecoverableException with Status.Aborted()
+ * if transaction has been or is being aborted
+ * @throws NonRecoverableException with Status.IllegalState()
+ * if transaction is still open (i.e. commit() hasn't been called yet)
+ * @throws NonRecoverableException with Status.NotSupported()
+ * if transaction is in unexpected state (non-compatible backend?)
* @throws KuduException if an error happens while querying the system about
* the state of the transaction
*/
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
index 28784ab..6c4350f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
@@ -442,7 +442,7 @@ class RpcProxy {
// TODO(aserbin): try sending request to other TxnManager instance,
// if possible. The idea is that Kudu clusters are expected
- // expected to have multiple masters, so if one TxnManager
+ // to have multiple masters, so if one TxnManager
// instance is not available, there is a high chance that
// others are still available (TxnManager is hosted by a
// kudu-master process).
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
index 5b8b035..5c97e54 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
@@ -17,6 +17,7 @@
package org.apache.kudu.client;
+import static org.apache.kudu.test.ClientTestUtil.createBasicSchemaInsert;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -28,6 +29,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import com.google.common.collect.ImmutableList;
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
import org.junit.Before;
@@ -35,6 +37,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.function.ThrowingRunnable;
+import org.apache.kudu.test.ClientTestUtil;
import org.apache.kudu.test.KuduTestHarness;
import org.apache.kudu.test.KuduTestHarness.MasterServerConfig;
import org.apache.kudu.test.KuduTestHarness.TabletServerConfig;
@@ -446,7 +449,7 @@ public class TestKuduTransaction {
*/
@Test(timeout = 100000)
@MasterServerConfig(flags = {
- "--txn_manager_enabled=true",
+ "--txn_manager_enabled",
})
@TabletServerConfig(flags = {
"--txn_schedule_background_tasks=false"
@@ -497,7 +500,7 @@ public class TestKuduTransaction {
*/
@Test(timeout = 100000)
@MasterServerConfig(flags = {
- "--txn_manager_enabled=true",
+ "--txn_manager_enabled",
})
public void testSerializationOptions() throws Exception {
final KuduTransaction txn = client.newTransaction();
@@ -560,7 +563,7 @@ public class TestKuduTransaction {
*/
@Test(timeout = 100000)
@MasterServerConfig(flags = {
- "--txn_manager_enabled=true",
+ "--txn_manager_enabled",
})
@TabletServerConfig(flags = {
"--txn_keepalive_interval_ms=200",
@@ -628,7 +631,7 @@ public class TestKuduTransaction {
*/
@Test(timeout = 100000)
@MasterServerConfig(flags = {
- "--txn_manager_enabled=true",
+ "--txn_manager_enabled",
})
@TabletServerConfig(flags = {
"--txn_keepalive_interval_ms=200",
@@ -713,4 +716,167 @@ public class TestKuduTransaction {
txn.isCommitComplete();
}
}
+
+ /**
+ * Test to verify that Kudu client is able to switch to TxnManager hosted by
+ * other kudu-master process when the previously used one isn't available.
+ */
+ @Test(timeout = 100000)
+ @MasterServerConfig(flags = {
+ // TxnManager functionality is necessary for this scenario.
+ "--txn_manager_enabled",
+
+ // Set Raft heartbeat interval short for faster test runtime: speed up
+ // leader failure detection and new leader election.
+ "--raft_heartbeat_interval_ms=100",
+ })
+ public void testSwitchToOtherTxnManager() throws Exception {
+ final String TABLE_NAME = "txn_manager_ops_fallback";
+ client.createTable(
+ TABLE_NAME,
+ ClientTestUtil.getBasicSchema(),
+ new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 2));
+
+ // Start a transaction, then restart every available TxnManager instance
+ // before attempting any txn-related operation.
+ {
+ KuduTransaction txn = client.newTransaction();
+ KuduSession session = txn.newKuduSession();
+
+ KuduTable table = client.openTable(TABLE_NAME);
+
+ Insert insert = createBasicSchemaInsert(table, 0);
+ session.apply(insert);
+ session.flush();
+
+ harness.killAllMasterServers();
+ harness.startAllMasterServers();
+
+ // Querying the status of a transaction should be possible, as usual.
+ // Since the transaction is still open, KuduTransaction.isCommitComplete()
+ // should throw corresponding exception with Status.IllegalState.
+ try {
+ txn.isCommitComplete();
+ fail("KuduTransaction.isCommitComplete should have thrown");
+ } catch (NonRecoverableException e) {
+ assertTrue(e.getStatus().toString(), e.getStatus().isIllegalState());
+ assertEquals("transaction is still open", e.getMessage());
+ }
+
+ harness.killAllMasterServers();
+ harness.startAllMasterServers();
+
+ // It should be possible to commit the transaction.
+ txn.commit(true /*wait*/);
+
+ // An extra sanity check: read back the row written into the table in the
+ // context of the transaction.
+ KuduScanner scanner = new KuduScanner.KuduScannerBuilder(asyncClient, table)
+ .replicaSelection(ReplicaSelection.LEADER_ONLY)
+ .build();
+
+ assertEquals(1, scanner.nextRows().getNumRows());
+ }
+
+ // Similar to the above, but run KuduTransaction.commit() when only 2 out
+ // of 3 masters are running while the TxnManager which used to start the
+ // transaction is no longer around.
+ {
+ KuduTransaction txn = client.newTransaction();
+ KuduSession session = txn.newKuduSession();
+
+ KuduTable table = client.openTable(TABLE_NAME);
+
+ Insert insert = createBasicSchemaInsert(table, 1);
+ session.apply(insert);
+ session.flush();
+
+ harness.killLeaderMasterServer();
+
+ // It should be possible to commit the transaction: 2 out of 3 masters are
+ // running and Raft should be able to establish a leader master. So,
+ // txn-related operations routed through TxnManager should succeed.
+ txn.commit(true /*wait*/);
+
+ // An extra sanity check: read back the row written into the table in the
+ // context of the transaction.
+ KuduScanner scanner = new KuduScanner.KuduScannerBuilder(asyncClient, table)
+ .replicaSelection(ReplicaSelection.LEADER_ONLY)
+ .build();
+
+ // It's an empty transaction, and 1 row should be there from the prior
+ // sub-scenario.
+ assertEquals(1, scanner.nextRows().getNumRows());
+ }
+ }
+
+ /**
+ * Test to verify that Kudu client is able to switch to TxnManager hosted by
+ * other kudu-master process when the previously used one isn't available,
+ * even if txn-related calls first are issued when no TxnManager was running.
+ */
+ @Test(timeout = 100000)
+ @MasterServerConfig(flags = {
+ // TxnManager functionality is necessary for this scenario.
+ "--txn_manager_enabled",
+
+ // Set Raft heartbeat interval short for faster test runtime: speed up
+ // leader failure detection and new leader election.
+ "--raft_heartbeat_interval_ms=100",
+ })
+ public void testSwitchToOtherTxnManagerInFlightCalls() throws Exception {
+ final String TABLE_NAME = "txn_manager_ops_fallback_inflight";
+ client.createTable(
+ TABLE_NAME,
+ ClientTestUtil.getBasicSchema(),
+ new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 2));
+
+ KuduTransaction txn = client.newTransaction();
+ KuduSession session = txn.newKuduSession();
+
+ KuduTable table = client.openTable(TABLE_NAME);
+
+ Insert insert = createBasicSchemaInsert(table, 0);
+ session.apply(insert);
+ session.flush();
+
+ harness.killAllMasterServers();
+
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ // Sleep for some time to allow the KuduTransaction.commit() call
+ // below issue RPCs to non-running TxnManangers.
+ Thread.sleep(1000);
+ harness.startAllMasterServers();
+ } catch (Exception e) {
+ fail("failed to start all masters: " + e);
+ }
+ }
+ });
+ t.start();
+
+ // It should be possible to commit the transaction.
+ txn.commit(true /*wait*/);
+
+ // Just an extra sanity check: the thread should join pretty fast, otherwise
+ // the call to KuduTransaction.commit() above could not succeed.
+ t.join(250);
+
+ // An extra sanity check: read back the row written into the table in the
+ // context of the transaction.
+ KuduScanner scanner = new KuduScanner.KuduScannerBuilder(asyncClient, table)
+ .replicaSelection(ReplicaSelection.LEADER_ONLY)
+ .build();
+
+ assertEquals(1, scanner.nextRows().getNumRows());
+ }
+
+ // TODO(aserbin): when test harness allows for sending Kudu servers particular
+ // signals, add a test scenario to verify that timeout for
+ // TxnManager request is set low enough to detect 'frozen'
+ // TxnManager instance (e.g., sent SIGSTOP signal), and is able
+ // to switch to another TxnManager to send txn keepalive
+ // requests fast enough to keep the transaction alive.
}