You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2021/04/13 05:59:58 UTC

[kudu] branch master updated: KUDU-2612: fix txn keepalive failover in Java client

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 096f1dd  KUDU-2612: fix txn keepalive failover in Java client
096f1dd is described below

commit 096f1ddf09047ea11d78a661010dd549ffa9af51
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Sun Apr 11 14:23:58 2021 -0700

    KUDU-2612: fix txn keepalive failover in Java client
    
    Prior to this patch, transaction keepalive heartbeater in Java client
    wouldn't switch to a different TxnManager instance fast enough when
    current TxnManager is no longer available (e.g., corresponding Kudu
    master stopped or shutdown).  That might lead to unintended termination
    of a multi-row transaction if a leader master was shutdown while the
    transaction was in progress.  The issue was due to:
      * using long timeout (i.e. the default one) for issuing
        KeepTransactionAlive RPCs
      * sending KeepTransactionAlive RPCs synchronously on the client's
        HashedWheelTimer timer which was supposed to run only short tasks
    
    This patch fixes the issue mentioned above and adds a new test scenario
    which verifies that Kudu Java client switches to a different TxnManager
    instance when the previously used one is no longer available.
    
    Change-Id: I27ecbf3063d0657a20741088060d8562f8c40bc7
    Reviewed-on: http://gerrit.cloudera.org:8080/17305
    Tested-by: Alexey Serbin <as...@cloudera.com>
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 .../org/apache/kudu/client/AsyncKuduClient.java    |  11 +-
 .../org/apache/kudu/client/KuduTransaction.java    | 173 ++++++++++++++-------
 .../apache/kudu/client/TestKuduTransaction.java    |  68 ++++++++
 3 files changed, 192 insertions(+), 60 deletions(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 0710db8..34996a5 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -2200,9 +2200,10 @@ public class AsyncKuduClient implements AutoCloseable {
   }
 
   /**
-   * This methods enable putting RPCs on hold for a period of time determined by
-   * {@link #getSleepTimeForRpcMillis(KuduRpc)}. If the RPC is out of time/retries, its errback will
-   * be immediately called.
+   * This method puts RPC on hold for a time interval determined by
+   * {@link #getSleepTimeForRpcMillis(KuduRpc)}. If the RPC is out of
+   * time/retries, its errback is called immediately.
+   *
    * @param rpc the RPC to retry later
    * @param ex the reason why we need to retry
    * @return a Deferred object to use if this method is called inline with the user's original
@@ -2726,8 +2727,8 @@ public class AsyncKuduClient implements AutoCloseable {
     private long defaultAdminOperationTimeoutMs = DEFAULT_OPERATION_TIMEOUT_MS;
     private long defaultOperationTimeoutMs = DEFAULT_OPERATION_TIMEOUT_MS;
 
-    private final HashedWheelTimer timer =
-        new HashedWheelTimer(new ThreadFactoryBuilder().setDaemon(true).build(), 20, MILLISECONDS);
+    private final HashedWheelTimer timer = new HashedWheelTimer(
+        new ThreadFactoryBuilder().setDaemon(true).build(), 20, MILLISECONDS);
     private Executor workerExecutor;
     private int workerCount = DEFAULT_WORKER_COUNT;
     private boolean statisticsDisabled = false;
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
index ed61497..9568971 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
@@ -18,7 +18,6 @@
 package org.apache.kudu.client;
 
 import java.io.IOException;
-import javax.annotation.Nullable;
 
 import com.google.common.base.Preconditions;
 import com.google.protobuf.CodedInputStream;
@@ -603,6 +602,25 @@ public class KuduTransaction implements AutoCloseable {
     return period;
   }
 
+  /**
+   * Return timeout for sending heartbeat messages given the specified
+   * keepalive timeout for a transaction (both in milliseconds). If something
+   * goes wrong and keepalive RPC fails, it should be possible to retry sending
+   * keepalive message a couple of times before the transaction is automatically
+   * aborted by the backend after not receiving keepalive messages for longer
+   * than the keepalive timeout for the transaction.
+   *
+   * @param keepaliveMillis keepalive timeout interval for a transaction (ms)
+   * @return a proper timeout for sending keepalive RPC from the client side
+   */
+  private static long keepaliveRequestTimeout(long keepaliveMillis) {
+    long timeout = keepalivePeriodForTimeout(keepaliveMillis) / 2;
+    if (timeout <= 0) {
+      timeout = 1;
+    }
+    return timeout;
+  }
+
   private void startKeepaliveHeartbeating() {
     if (keepaliveEnabled) {
       LOG.debug("starting keepalive heartbeating with period {} ms (txn ID {})",
@@ -613,65 +631,110 @@ public class KuduTransaction implements AutoCloseable {
     }
   }
 
-  private final class SendKeepaliveTask implements TimerTask {
-    /**
-     * Send keepalive heartbeat message for the transaction represented by
-     * this {@link KuduTransaction} handle and re-schedule itself
-     * (i.e. this task) to send next heartbeat interval
-     *
-     * @param timeout a handle which is associated with this task
-     */
-    @Override
-    public void run(Timeout timeout) throws Exception {
-      if (timeout.isCancelled()) {
-        LOG.debug("terminating keepalive task (txn ID {})", txnId);
-        return;
-      }
-      try {
-        doSendKeepalive();
-      } catch (RecoverableException e) {
-        // Just continue sending heartbeats as required: the recoverable
-        // exception means the condition is transient.
-        // TODO(aserbin): should we send next heartbeat sooner? E.g., retry
-        //                immediately, and do such retry only once after a
-        //                failure like this. The idea is to avoid missing
-        //                heartbeats in situations where the second attempt
-        //                after keepaliveMillis/2 would as well due to a network
-        //                issue, but immediate retry could succeed.
-        LOG.debug("continuing keepalive heartbeating (txn ID {}): {}",
-            txnId, e.toString());
-      } catch (Exception e) {
-        LOG.debug("terminating keepalive task (txn ID {}) due to exception {}",
-            txnId, e.toString());
-        return;
-      }
-      synchronized (keepaliveTaskHandleSync) {
-        // Re-schedule the task, refreshing the task handle.
-        keepaliveTaskHandle = AsyncKuduClient.newTimeout(
-            timeout.timer(), this, keepalivePeriodForTimeout(keepaliveMillis));
-      }
-    }
-
-    private void doSendKeepalive() throws KuduException {
-      KeepTransactionAliveRequest request = new KeepTransactionAliveRequest(
-          client.getMasterTable(),
-          client.getTimer(),
-          client.getDefaultAdminOperationTimeoutMs(),
-          txnId);
-      Deferred<KeepTransactionAliveResponse> d = client.sendRpcToTablet(request);
-      KuduClient.joinAndHandleException(d);
-    }
-  }
-
   void doStartKeepaliveHeartbeating() {
     Preconditions.checkState(keepaliveEnabled);
+    Preconditions.checkArgument(txnId > AsyncKuduClient.INVALID_TXN_ID);
     synchronized (keepaliveTaskHandleSync) {
       Preconditions.checkState(keepaliveTaskHandle == null,
           "keepalive heartbeating has already started");
-      keepaliveTaskHandle = AsyncKuduClient.newTimeout(
-          client.getTimer(),
-          new SendKeepaliveTask(),
-          keepalivePeriodForTimeout(keepaliveMillis));
+      long sleepTimeMillis = keepalivePeriodForTimeout(keepaliveMillis);
+      keepaliveTaskHandle = delayedSendKeepTransactionAlive(sleepTimeMillis,
+          getSendKeepTransactionAliveCB(), getSendKeepTransactionAliveEB());
     }
   }
+
+  /**
+   * Send keepalive message to TxnManager for this transaction.
+   *
+   * @return a future object to handle the results of the sent RPC
+   */
+  private Deferred<KeepTransactionAliveResponse> doSendKeepTransactionAlive() {
+    // The timeout for the keepalive RPC is dictated by the keepalive
+    // timeout for the transaction.
+    long timeoutMs = keepaliveRequestTimeout(keepaliveMillis);
+    KeepTransactionAliveRequest request = new KeepTransactionAliveRequest(
+        client.getMasterTable(), client.getTimer(), timeoutMs, txnId);
+    return client.sendRpcToTablet(request);
+  }
+
+  /**
+   * Schedule a timer to send a KeepTransactiveAlive RPC to TxnManager after
+   * @c sleepTimeMillis milliseconds.
+   *
+   * @param runAfterMillis time delta from now when to run the task
+   * @param callback callback to call on successfully sent RPC
+   * @param errback errback to call if something goes wrong with sending RPC
+   */
+  private Timeout delayedSendKeepTransactionAlive(
+      long runAfterMillis,
+      final Callback<Void, KeepTransactionAliveResponse> callback,
+      final Callback<Void, Exception> errback) {
+
+    final class RetryTimer implements TimerTask {
+      @Override
+      public void run(final Timeout timeout) {
+        doSendKeepTransactionAlive().addCallbacks(callback, errback);
+      }
+    }
+
+    return client.newTimeout(client.getTimer(), new RetryTimer(), runAfterMillis);
+  }
+
+  private Callback<Void, KeepTransactionAliveResponse> getSendKeepTransactionAliveCB() {
+    // Time interval to wait before sending next KeepTransactionAlive request.
+    long sleepTimeMillis = keepalivePeriodForTimeout(keepaliveMillis);
+    return resp -> {
+      // Store the Deferred locally; callback() below will reset it and we'd
+      // return a different, non-triggered Deferred.
+      synchronized (keepaliveTaskHandleSync) {
+        if (!keepaliveTaskHandle.isCancelled()) {
+          keepaliveTaskHandle = delayedSendKeepTransactionAlive(
+              sleepTimeMillis,
+              getSendKeepTransactionAliveCB(),
+              getSendKeepTransactionAliveEB());
+        }
+      }
+      return null;
+    };
+  }
+
+  private Callback<Void, Exception> getSendKeepTransactionAliveEB() {
+    return e -> {
+      boolean scheduleNextRun = false;
+      long nextRunAfterMillis = -1;
+      if (e instanceof RecoverableException) {
+        scheduleNextRun = true;
+        nextRunAfterMillis = keepaliveRequestTimeout(keepaliveMillis);
+        // Continue sending heartbeats as required: the recoverable exception
+        // means the condition is transient. However, attempt sending next
+        // keepalive message sooner since one has just been missed.
+        LOG.debug("continuing keepalive heartbeating (txn ID {}): {}",
+            txnId, e.toString());
+      } else if (e instanceof NonRecoverableException) {
+        NonRecoverableException ex = (NonRecoverableException) e;
+        if (ex.getStatus().isTimedOut()) {
+          // Send next keepalive message sooner: it's been a long timeout.
+          scheduleNextRun = true;
+          nextRunAfterMillis = 1;
+          LOG.debug("sending keepalive message after prior one timed out (txn ID {}): {}",
+              txnId, e.toString());
+        } else {
+          LOG.debug("terminating keepalive task (txn ID {}) due to exception {}",
+              txnId, e.toString());
+        }
+      }
+      if (scheduleNextRun) {
+        Preconditions.checkArgument(nextRunAfterMillis >= 0);
+        synchronized (keepaliveTaskHandleSync) {
+          if (!keepaliveTaskHandle.isCancelled()) {
+            keepaliveTaskHandle = delayedSendKeepTransactionAlive(
+                nextRunAfterMillis,
+                getSendKeepTransactionAliveCB(),
+                getSendKeepTransactionAliveEB());
+          }
+        }
+      }
+      return null;
+    };
+  }
 }
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
index 5c97e54..1114d85 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
@@ -873,6 +873,74 @@ public class TestKuduTransaction {
     assertEquals(1, scanner.nextRows().getNumRows());
   }
 
+  /**
+   * Test to verify that Kudu client is able to switch to another TxnManager
+   * instance when the kudu-master process which hosts currently used TxnManager
+   * becomes temporarily unavailable (e.g. shut down, restarted, stopped, etc.).
+   *
+   * The essence of this scenario is to make sure that Kudu Java client connects
+   * to a different TxnManager instance and starts sending txn keepalive
+   * messages there in a timely manner, keeping the transaction alive even if
+   * the originally used TxnManager instance isn't available.
+   */
+  @Test(timeout = 100000)
+  @MasterServerConfig(flags = {
+      // TxnManager functionality is necessary for this scenario.
+      "--txn_manager_enabled",
+
+      // Set Raft heartbeat interval short for faster test runtime: speed up
+      // leader failure detection and new leader election.
+      "--raft_heartbeat_interval_ms=100",
+  })
+  @TabletServerConfig(flags = {
+      // The txn keepalive interval should be long enough to accommodate Raft
+      // leader failure detection and election.
+      "--txn_keepalive_interval_ms=3000",
+      "--txn_staleness_tracker_interval_ms=500"
+  })
+  public void testTxnKeepaliveSwitchesToOtherTxnManager() throws Exception {
+    final String TABLE_NAME = "txn_manager_fallback";
+    client.createTable(
+        TABLE_NAME,
+        ClientTestUtil.getBasicSchema(),
+        new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 2));
+
+    KuduTransaction txn = client.newTransaction();
+    KuduSession session = txn.newKuduSession();
+
+    KuduTable table = client.openTable(TABLE_NAME);
+
+    Insert insert = createBasicSchemaInsert(table, 0);
+    session.apply(insert);
+    session.flush();
+
+    harness.killLeaderMasterServer();
+
+    // Wait for two keepalive intervals to make sure the backend got a chance
+    // to automatically abort the transaction if not receiving txn keepalive
+    // messages.
+    Thread.sleep(2 * 3000);
+
+    // It should be possible to commit the transaction. This is to verify that
+    //
+    //   * the client eventually starts sending txn keepalive messages to other
+    //     TxnManager instance (the original was hosted by former leader master
+    //     which is no longer available), so the backend doesn't abort the
+    //     transaction automatically due to not receiving keepalive messages
+    //
+    //   * the client switches to the new TxnManager for other txn-related
+    //     operations as well
+    txn.commit(true /*wait*/);
+
+    // An extra sanity check: read back the row written into the table in the
+    // context of the transaction.
+    KuduScanner scanner = new KuduScanner.KuduScannerBuilder(asyncClient, table)
+        .replicaSelection(ReplicaSelection.LEADER_ONLY)
+        .build();
+
+    assertEquals(1, scanner.nextRows().getNumRows());
+  }
+
   // TODO(aserbin): when test harness allows for sending Kudu servers particular
   //                signals, add a test scenario to verify that timeout for
   //                TxnManager request is set low enough to detect 'frozen'