You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2021/04/13 05:59:58 UTC
[kudu] branch master updated: KUDU-2612: fix txn keepalive failover
in Java client
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 096f1dd KUDU-2612: fix txn keepalive failover in Java client
096f1dd is described below
commit 096f1ddf09047ea11d78a661010dd549ffa9af51
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Sun Apr 11 14:23:58 2021 -0700
KUDU-2612: fix txn keepalive failover in Java client
Prior to this patch, transaction keepalive heartbeater in Java client
wouldn't switch to a different TxnManager instance fast enough when
current TxnManager is no longer available (e.g., corresponding Kudu
master stopped or shutdown). That might lead to unintended termination
of a multi-row transaction if a leader master was shutdown while the
transaction was in progress. The issue was due to:
* using long timeout (i.e. the default one) for issuing
KeepTransactionAlive RPCs
* sending KeepTransactionAlive RPCs synchronously on the client's
HashedWheelTimer timer which was supposed to run only short tasks
This patch fixes the issue mentioned above and adds a new test scenario
which verifies that Kudu Java client switches to a different TxnManager
instance when the previously used one is no longer available.
Change-Id: I27ecbf3063d0657a20741088060d8562f8c40bc7
Reviewed-on: http://gerrit.cloudera.org:8080/17305
Tested-by: Alexey Serbin <as...@cloudera.com>
Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
.../org/apache/kudu/client/AsyncKuduClient.java | 11 +-
.../org/apache/kudu/client/KuduTransaction.java | 173 ++++++++++++++-------
.../apache/kudu/client/TestKuduTransaction.java | 68 ++++++++
3 files changed, 192 insertions(+), 60 deletions(-)
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 0710db8..34996a5 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -2200,9 +2200,10 @@ public class AsyncKuduClient implements AutoCloseable {
}
/**
- * This methods enable putting RPCs on hold for a period of time determined by
- * {@link #getSleepTimeForRpcMillis(KuduRpc)}. If the RPC is out of time/retries, its errback will
- * be immediately called.
+ * This method puts RPC on hold for a time interval determined by
+ * {@link #getSleepTimeForRpcMillis(KuduRpc)}. If the RPC is out of
+ * time/retries, its errback is called immediately.
+ *
* @param rpc the RPC to retry later
* @param ex the reason why we need to retry
* @return a Deferred object to use if this method is called inline with the user's original
@@ -2726,8 +2727,8 @@ public class AsyncKuduClient implements AutoCloseable {
private long defaultAdminOperationTimeoutMs = DEFAULT_OPERATION_TIMEOUT_MS;
private long defaultOperationTimeoutMs = DEFAULT_OPERATION_TIMEOUT_MS;
- private final HashedWheelTimer timer =
- new HashedWheelTimer(new ThreadFactoryBuilder().setDaemon(true).build(), 20, MILLISECONDS);
+ private final HashedWheelTimer timer = new HashedWheelTimer(
+ new ThreadFactoryBuilder().setDaemon(true).build(), 20, MILLISECONDS);
private Executor workerExecutor;
private int workerCount = DEFAULT_WORKER_COUNT;
private boolean statisticsDisabled = false;
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
index ed61497..9568971 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
@@ -18,7 +18,6 @@
package org.apache.kudu.client;
import java.io.IOException;
-import javax.annotation.Nullable;
import com.google.common.base.Preconditions;
import com.google.protobuf.CodedInputStream;
@@ -603,6 +602,25 @@ public class KuduTransaction implements AutoCloseable {
return period;
}
+ /**
+ * Return timeout for sending heartbeat messages given the specified
+ * keepalive timeout for a transaction (both in milliseconds). If something
+ * goes wrong and keepalive RPC fails, it should be possible to retry sending
+ * keepalive message a couple of times before the transaction is automatically
+ * aborted by the backend after not receiving keepalive messages for longer
+ * than the keepalive timeout for the transaction.
+ *
+ * @param keepaliveMillis keepalive timeout interval for a transaction (ms)
+ * @return a proper timeout for sending keepalive RPC from the client side
+ */
+ private static long keepaliveRequestTimeout(long keepaliveMillis) {
+ long timeout = keepalivePeriodForTimeout(keepaliveMillis) / 2;
+ if (timeout <= 0) {
+ timeout = 1;
+ }
+ return timeout;
+ }
+
private void startKeepaliveHeartbeating() {
if (keepaliveEnabled) {
LOG.debug("starting keepalive heartbeating with period {} ms (txn ID {})",
@@ -613,65 +631,110 @@ public class KuduTransaction implements AutoCloseable {
}
}
- private final class SendKeepaliveTask implements TimerTask {
- /**
- * Send keepalive heartbeat message for the transaction represented by
- * this {@link KuduTransaction} handle and re-schedule itself
- * (i.e. this task) to send next heartbeat interval
- *
- * @param timeout a handle which is associated with this task
- */
- @Override
- public void run(Timeout timeout) throws Exception {
- if (timeout.isCancelled()) {
- LOG.debug("terminating keepalive task (txn ID {})", txnId);
- return;
- }
- try {
- doSendKeepalive();
- } catch (RecoverableException e) {
- // Just continue sending heartbeats as required: the recoverable
- // exception means the condition is transient.
- // TODO(aserbin): should we send next heartbeat sooner? E.g., retry
- // immediately, and do such retry only once after a
- // failure like this. The idea is to avoid missing
- // heartbeats in situations where the second attempt
- // after keepaliveMillis/2 would as well due to a network
- // issue, but immediate retry could succeed.
- LOG.debug("continuing keepalive heartbeating (txn ID {}): {}",
- txnId, e.toString());
- } catch (Exception e) {
- LOG.debug("terminating keepalive task (txn ID {}) due to exception {}",
- txnId, e.toString());
- return;
- }
- synchronized (keepaliveTaskHandleSync) {
- // Re-schedule the task, refreshing the task handle.
- keepaliveTaskHandle = AsyncKuduClient.newTimeout(
- timeout.timer(), this, keepalivePeriodForTimeout(keepaliveMillis));
- }
- }
-
- private void doSendKeepalive() throws KuduException {
- KeepTransactionAliveRequest request = new KeepTransactionAliveRequest(
- client.getMasterTable(),
- client.getTimer(),
- client.getDefaultAdminOperationTimeoutMs(),
- txnId);
- Deferred<KeepTransactionAliveResponse> d = client.sendRpcToTablet(request);
- KuduClient.joinAndHandleException(d);
- }
- }
-
void doStartKeepaliveHeartbeating() {
Preconditions.checkState(keepaliveEnabled);
+ Preconditions.checkArgument(txnId > AsyncKuduClient.INVALID_TXN_ID);
synchronized (keepaliveTaskHandleSync) {
Preconditions.checkState(keepaliveTaskHandle == null,
"keepalive heartbeating has already started");
- keepaliveTaskHandle = AsyncKuduClient.newTimeout(
- client.getTimer(),
- new SendKeepaliveTask(),
- keepalivePeriodForTimeout(keepaliveMillis));
+ long sleepTimeMillis = keepalivePeriodForTimeout(keepaliveMillis);
+ keepaliveTaskHandle = delayedSendKeepTransactionAlive(sleepTimeMillis,
+ getSendKeepTransactionAliveCB(), getSendKeepTransactionAliveEB());
}
}
+
+ /**
+ * Send keepalive message to TxnManager for this transaction.
+ *
+ * @return a future object to handle the results of the sent RPC
+ */
+ private Deferred<KeepTransactionAliveResponse> doSendKeepTransactionAlive() {
+ // The timeout for the keepalive RPC is dictated by the keepalive
+ // timeout for the transaction.
+ long timeoutMs = keepaliveRequestTimeout(keepaliveMillis);
+ KeepTransactionAliveRequest request = new KeepTransactionAliveRequest(
+ client.getMasterTable(), client.getTimer(), timeoutMs, txnId);
+ return client.sendRpcToTablet(request);
+ }
+
+ /**
+ * Schedule a timer to send a KeepTransactiveAlive RPC to TxnManager after
+ * @c sleepTimeMillis milliseconds.
+ *
+ * @param runAfterMillis time delta from now when to run the task
+ * @param callback callback to call on successfully sent RPC
+ * @param errback errback to call if something goes wrong with sending RPC
+ */
+ private Timeout delayedSendKeepTransactionAlive(
+ long runAfterMillis,
+ final Callback<Void, KeepTransactionAliveResponse> callback,
+ final Callback<Void, Exception> errback) {
+
+ final class RetryTimer implements TimerTask {
+ @Override
+ public void run(final Timeout timeout) {
+ doSendKeepTransactionAlive().addCallbacks(callback, errback);
+ }
+ }
+
+ return client.newTimeout(client.getTimer(), new RetryTimer(), runAfterMillis);
+ }
+
+ private Callback<Void, KeepTransactionAliveResponse> getSendKeepTransactionAliveCB() {
+ // Time interval to wait before sending next KeepTransactionAlive request.
+ long sleepTimeMillis = keepalivePeriodForTimeout(keepaliveMillis);
+ return resp -> {
+ // Store the Deferred locally; callback() below will reset it and we'd
+ // return a different, non-triggered Deferred.
+ synchronized (keepaliveTaskHandleSync) {
+ if (!keepaliveTaskHandle.isCancelled()) {
+ keepaliveTaskHandle = delayedSendKeepTransactionAlive(
+ sleepTimeMillis,
+ getSendKeepTransactionAliveCB(),
+ getSendKeepTransactionAliveEB());
+ }
+ }
+ return null;
+ };
+ }
+
+ private Callback<Void, Exception> getSendKeepTransactionAliveEB() {
+ return e -> {
+ boolean scheduleNextRun = false;
+ long nextRunAfterMillis = -1;
+ if (e instanceof RecoverableException) {
+ scheduleNextRun = true;
+ nextRunAfterMillis = keepaliveRequestTimeout(keepaliveMillis);
+ // Continue sending heartbeats as required: the recoverable exception
+ // means the condition is transient. However, attempt sending next
+ // keepalive message sooner since one has just been missed.
+ LOG.debug("continuing keepalive heartbeating (txn ID {}): {}",
+ txnId, e.toString());
+ } else if (e instanceof NonRecoverableException) {
+ NonRecoverableException ex = (NonRecoverableException) e;
+ if (ex.getStatus().isTimedOut()) {
+ // Send next keepalive message sooner: it's been a long timeout.
+ scheduleNextRun = true;
+ nextRunAfterMillis = 1;
+ LOG.debug("sending keepalive message after prior one timed out (txn ID {}): {}",
+ txnId, e.toString());
+ } else {
+ LOG.debug("terminating keepalive task (txn ID {}) due to exception {}",
+ txnId, e.toString());
+ }
+ }
+ if (scheduleNextRun) {
+ Preconditions.checkArgument(nextRunAfterMillis >= 0);
+ synchronized (keepaliveTaskHandleSync) {
+ if (!keepaliveTaskHandle.isCancelled()) {
+ keepaliveTaskHandle = delayedSendKeepTransactionAlive(
+ nextRunAfterMillis,
+ getSendKeepTransactionAliveCB(),
+ getSendKeepTransactionAliveEB());
+ }
+ }
+ }
+ return null;
+ };
+ }
}
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
index 5c97e54..1114d85 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
@@ -873,6 +873,74 @@ public class TestKuduTransaction {
assertEquals(1, scanner.nextRows().getNumRows());
}
+ /**
+ * Test to verify that Kudu client is able to switch to another TxnManager
+ * instance when the kudu-master process which hosts currently used TxnManager
+ * becomes temporarily unavailable (e.g. shut down, restarted, stopped, etc.).
+ *
+ * The essence of this scenario is to make sure that Kudu Java client connects
+ * to a different TxnManager instance and starts sending txn keepalive
+ * messages there in a timely manner, keeping the transaction alive even if
+ * the originally used TxnManager instance isn't available.
+ */
+ @Test(timeout = 100000)
+ @MasterServerConfig(flags = {
+ // TxnManager functionality is necessary for this scenario.
+ "--txn_manager_enabled",
+
+ // Set Raft heartbeat interval short for faster test runtime: speed up
+ // leader failure detection and new leader election.
+ "--raft_heartbeat_interval_ms=100",
+ })
+ @TabletServerConfig(flags = {
+ // The txn keepalive interval should be long enough to accommodate Raft
+ // leader failure detection and election.
+ "--txn_keepalive_interval_ms=3000",
+ "--txn_staleness_tracker_interval_ms=500"
+ })
+ public void testTxnKeepaliveSwitchesToOtherTxnManager() throws Exception {
+ final String TABLE_NAME = "txn_manager_fallback";
+ client.createTable(
+ TABLE_NAME,
+ ClientTestUtil.getBasicSchema(),
+ new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 2));
+
+ KuduTransaction txn = client.newTransaction();
+ KuduSession session = txn.newKuduSession();
+
+ KuduTable table = client.openTable(TABLE_NAME);
+
+ Insert insert = createBasicSchemaInsert(table, 0);
+ session.apply(insert);
+ session.flush();
+
+ harness.killLeaderMasterServer();
+
+ // Wait for two keepalive intervals to make sure the backend got a chance
+ // to automatically abort the transaction if not receiving txn keepalive
+ // messages.
+ Thread.sleep(2 * 3000);
+
+ // It should be possible to commit the transaction. This is to verify that
+ //
+ // * the client eventually starts sending txn keepalive messages to other
+ // TxnManager instance (the original was hosted by former leader master
+ // which is no longer available), so the backend doesn't abort the
+ // transaction automatically due to not receiving keepalive messages
+ //
+ // * the client switches to the new TxnManager for other txn-related
+ // operations as well
+ txn.commit(true /*wait*/);
+
+ // An extra sanity check: read back the row written into the table in the
+ // context of the transaction.
+ KuduScanner scanner = new KuduScanner.KuduScannerBuilder(asyncClient, table)
+ .replicaSelection(ReplicaSelection.LEADER_ONLY)
+ .build();
+
+ assertEquals(1, scanner.nextRows().getNumRows());
+ }
+
// TODO(aserbin): when test harness allows for sending Kudu servers particular
// signals, add a test scenario to verify that timeout for
// TxnManager request is set low enough to detect 'frozen'