You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2021/04/28 22:49:31 UTC

[kudu] branch master updated: KUDU-2612: propagate commit timestamp (Java client)

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 28f2f35  KUDU-2612: propagate commit timestamp (Java client)
28f2f35 is described below

commit 28f2f35fa1ff0e490043a9308b011d8fb5709283
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Tue Apr 27 20:40:28 2021 -0700

    KUDU-2612: propagate commit timestamp (Java client)
    
    With this patch, the commit timestamp for a non-empty multi-row
    transaction is propagated to a Kudu Java client upon calling either
    KuduTransaction.isCommitComplete() or KuduTransaction.commit(true).
    The former propagates the timestamp for the case of committing a
    transaction asynchronously, the latter works for the synchronous case.
    
    Updating the last observed timestamp with the commit timestamp is
    necessary to achieve consistency in the READ_YOUR_WRITES mode when
    reading the data of a transaction which has just been committed.  The
    commit phase might take some time and may even be retried in some cases,
    so even if the client observed timestamps for all the write operations
    it sent in the context this transaction, the maximum timestamp collected
    among the involved transaction participants might be far ahead of the
    last timestamp observed by the client so far.
    
    In addition, this patch addresses the most prominent cause of flakiness
    in the recently introduced scenario
    TestKuduTransaction.testTxnKeepaliveRollingSwitchToOtherTxnManager.
    
    This patch is a follow-up to e495d6bb759fdae7cd001d86df3bae5c4f5f2b36.
    
    Change-Id: I4177fe0d137b70bd18dd6c87eb42e8aaf03a00b3
    Reviewed-on: http://gerrit.cloudera.org:8080/17356
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 .../kudu/client/GetTransactionStateRequest.java    |   5 +-
 .../kudu/client/GetTransactionStateResponse.java   |  19 +++-
 .../org/apache/kudu/client/KuduTransaction.java    |   6 ++
 .../apache/kudu/client/TestKuduTransaction.java    | 111 ++++++++++++++++++++-
 4 files changed, 138 insertions(+), 3 deletions(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTransactionStateRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTransactionStateRequest.java
index be606cd..702806a 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTransactionStateRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTransactionStateRequest.java
@@ -21,6 +21,7 @@ import static org.apache.kudu.transactions.TxnManager.GetTransactionStateRespons
 
 import java.util.Collection;
 import java.util.List;
+import java.util.OptionalLong;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -73,8 +74,10 @@ class GetTransactionStateRequest extends KuduRpc<GetTransactionStateResponse> {
     if (!b.hasError()) {
       Preconditions.checkState(b.hasState());
     }
+    OptionalLong ts = b.hasCommitTimestamp() ? OptionalLong.of(b.getCommitTimestamp())
+                                             : OptionalLong.empty();
     GetTransactionStateResponse response = new GetTransactionStateResponse(
-        timeoutTracker.getElapsedMillis(), serverUUID, b.getState());
+        timeoutTracker.getElapsedMillis(), serverUUID, b.getState(), ts);
     return new Pair<>(response, b.hasError() ? b.getError() : null);
   }
 
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTransactionStateResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTransactionStateResponse.java
index 0dc576a..d410241 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTransactionStateResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTransactionStateResponse.java
@@ -17,6 +17,9 @@
 
 package org.apache.kudu.client;
 
+import java.util.OptionalLong;
+
+import com.google.common.base.Preconditions;
 import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.kudu.transactions.Transactions;
@@ -24,6 +27,7 @@ import org.apache.kudu.transactions.Transactions;
 @InterfaceAudience.Private
 public class GetTransactionStateResponse extends KuduRpcResponse {
   private final Transactions.TxnStatePB txnState;
+  private final OptionalLong txnCommitTimestamp;
 
   /**
    * @param elapsedMillis time in milliseconds since RPC creation to now
@@ -31,15 +35,28 @@ public class GetTransactionStateResponse extends KuduRpcResponse {
    * @param txnState the state of the transaction
    */
   GetTransactionStateResponse(
-      long elapsedMillis, String serverUUID, Transactions.TxnStatePB txnState) {
+      long elapsedMillis,
+      String serverUUID,
+      Transactions.TxnStatePB txnState,
+      OptionalLong txnCommitTimestamp) {
     super(elapsedMillis, serverUUID);
     this.txnState = txnState;
+    this.txnCommitTimestamp = txnCommitTimestamp;
   }
 
   public Transactions.TxnStatePB txnState() {
     return txnState;
   }
 
+  boolean hasCommitTimestamp() {
+    return txnCommitTimestamp.isPresent();
+  }
+
+  long getCommitTimestamp() {
+    Preconditions.checkState(hasCommitTimestamp());
+    return txnCommitTimestamp.getAsLong();
+  }
+
   public boolean isCommitted() {
     return txnState == Transactions.TxnStatePB.COMMITTED;
   }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
index 9568971..042ec69 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
@@ -313,6 +313,9 @@ public class KuduTransaction implements AutoCloseable {
     Deferred<GetTransactionStateResponse> d = isTransactionCommittedAsync();
     GetTransactionStateResponse resp = KuduClient.joinAndHandleException(d);
     final Transactions.TxnStatePB txnState = resp.txnState();
+    if (resp.hasCommitTimestamp()) {
+      client.updateLastPropagatedTimestamp(resp.getCommitTimestamp());
+    }
     switch (txnState) {
       case ABORT_IN_PROGRESS:
         throw new NonRecoverableException(Status.Aborted("transaction is being aborted"));
@@ -551,6 +554,9 @@ public class KuduTransaction implements AutoCloseable {
   private Callback<Deferred<GetTransactionStateResponse>, GetTransactionStateResponse>
       isTransactionCommittedCb(final KuduRpc<GetTransactionStateResponse> rpc) {
     return resp -> {
+      if (resp.hasCommitTimestamp()) {
+        client.updateLastPropagatedTimestamp(resp.getCommitTimestamp());
+      }
       // Store the Deferred locally; callback() below will reset it and we'd
       // return a different, non-triggered Deferred.
       Deferred<GetTransactionStateResponse> d = rpc.getDeferred();
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
index 956bb68..c8f9f2c 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
@@ -19,6 +19,7 @@ package org.apache.kudu.client;
 
 import static org.apache.kudu.test.ClientTestUtil.countRowsInScan;
 import static org.apache.kudu.test.ClientTestUtil.createBasicSchemaInsert;
+import static org.apache.kudu.test.junit.AssertHelpers.assertEventuallyTrue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -42,6 +43,7 @@ import org.apache.kudu.test.ClientTestUtil;
 import org.apache.kudu.test.KuduTestHarness;
 import org.apache.kudu.test.KuduTestHarness.MasterServerConfig;
 import org.apache.kudu.test.KuduTestHarness.TabletServerConfig;
+import org.apache.kudu.test.junit.AssertHelpers.BooleanExpression;
 import org.apache.kudu.transactions.Transactions.TxnTokenPB;
 
 
@@ -723,6 +725,114 @@ public class TestKuduTransaction {
   }
 
   /**
+   * This scenario validates the propagation of the commit timestamp for a
+   * multi-row transaction when committing the transaction synchronously via
+   * {@link KuduTransaction#commit(boolean wait = true)} or calling
+   * {@link KuduTransaction#isCommitComplete()} once the transaction's commit
+   * has started to run asynchronously.
+   */
+  @Test(timeout = 100000)
+  @MasterServerConfig(flags = {
+      // TxnManager functionality is necessary for this scenario.
+      "--txn_manager_enabled",
+  })
+  @TabletServerConfig(flags = {
+      // Inject latency to have a chance spotting the transaction in the
+      // FINALIZE_IN_PROGRESS state and make KuduTransaction.isCommitComplete()
+      // to return 'false' at least once before returning 'true'.
+      "--txn_status_manager_inject_latency_finalize_commit_ms=250",
+  })
+  public void testPropagateTxnCommitTimestamp() throws Exception {
+    final String TABLE_NAME = "propagate_txn_commit_timestamp";
+    client.createTable(
+        TABLE_NAME,
+        ClientTestUtil.getBasicSchema(),
+        new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 8));
+
+    KuduTable table = client.openTable(TABLE_NAME);
+
+    // Make sure the commit timestamp for a transaction is propagated to the
+    // client upon synchronously committing a transaction.
+    {
+      KuduTransaction txn = client.newTransaction();
+      KuduSession session = txn.newKuduSession();
+      session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
+
+      // Insert many rows: the goal is to get at least one row inserted into
+      // every tablet of the hash-partitioned test table, so every tablet would
+      // be a participant in the transaction, and most likely every tablet
+      // server would be involved.
+      for (int key = 0; key < 128; ++key) {
+        session.apply(createBasicSchemaInsert(table, key));
+      }
+      session.flush();
+      assertEquals(0, session.countPendingErrors());
+
+      final long tsBeforeCommit = client.getLastPropagatedTimestamp();
+      txn.commit(true /*wait*/);
+      final long tsAfterCommit = client.getLastPropagatedTimestamp();
+      assertTrue(tsAfterCommit > tsBeforeCommit);
+    }
+
+    // Make sure the commit timestamp for a transaction is propagated to the
+    // client upon calling KuduTransaction.isCommitComplete().
+    {
+      KuduTransaction txn = client.newTransaction();
+      KuduSession session = txn.newKuduSession();
+      session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
+
+      // Insert many rows: the goal is to get at least one row inserted into
+      // every tablet of the hash-partitioned test table, so every tablet would
+      // be a participant in the transaction, and most likely every tablet
+      // server would be involved.
+      for (int key = 128; key < 256; ++key) {
+        session.apply(createBasicSchemaInsert(table, key));
+      }
+      session.flush();
+      assertEquals(0, session.countPendingErrors());
+
+      final long tsBeforeCommit = client.getLastPropagatedTimestamp();
+      txn.commit(false /*wait*/);
+      assertEquals(tsBeforeCommit, client.getLastPropagatedTimestamp());
+
+      assertEventuallyTrue("commit should eventually finalize",
+          new BooleanExpression() {
+            @Override
+            public boolean get() throws Exception {
+              return txn.isCommitComplete();
+            }
+          }, 30000/*timeoutMillis*/);
+      long tsAfterCommitFinalized = client.getLastPropagatedTimestamp();
+      assertTrue(tsAfterCommitFinalized > tsBeforeCommit);
+
+      // A sanity check: calling isCommitComplete() again after the commit phase
+      // has been finalized doesn't change last propagated timestamp at the
+      // client side.
+      for (int i = 0; i < 10; ++i) {
+        assertTrue(txn.isCommitComplete());
+        assertEquals(tsAfterCommitFinalized, client.getLastPropagatedTimestamp());
+        Thread.sleep(10);
+      }
+    }
+
+    // An empty transaction doesn't have a timestamp, so there is nothing to
+    // propagate back to client when an empty transaction is committed, so the
+    // timestamp propagated to the client side should stay unchanged.
+    {
+      KuduTransaction txn = client.newTransaction();
+      final long tsBeforeCommit = client.getLastPropagatedTimestamp();
+      txn.commit(true /*wait*/);
+
+      // Just in case, linger a bit after commit has been finalized, checking
+      // for the timestamp propagated to the client side.
+      for (int i = 0; i < 10; ++i) {
+        Thread.sleep(10);
+        assertEquals(tsBeforeCommit, client.getLastPropagatedTimestamp());
+      }
+    }
+  }
+
+  /**
    * Test to verify that Kudu client is able to switch to TxnManager hosted by
    * other kudu-master process when the previously used one isn't available.
    */
@@ -1044,7 +1154,6 @@ public class TestKuduTransaction {
     // context of the transaction.
     KuduScanner scanner = new KuduScanner.KuduScannerBuilder(asyncClient, table)
         .readMode(AsyncKuduScanner.ReadMode.READ_YOUR_WRITES)
-        .replicaSelection(ReplicaSelection.LEADER_ONLY)
         .build();
     assertEquals(numMasters, countRowsInScan(scanner));
   }