You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2021/04/28 22:49:31 UTC
[kudu] branch master updated: KUDU-2612: propagate commit timestamp
(Java client)
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 28f2f35 KUDU-2612: propagate commit timestamp (Java client)
28f2f35 is described below
commit 28f2f35fa1ff0e490043a9308b011d8fb5709283
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Tue Apr 27 20:40:28 2021 -0700
KUDU-2612: propagate commit timestamp (Java client)
With this patch, the commit timestamp for a non-empty multi-row
transaction is propagated to a Kudu Java client upon calling either
KuduTransaction.isCommitComplete() or KuduTransaction.commit(true).
The former propagates the timestamp for the case of committing a
transaction asynchronously, the latter works for the synchronous case.
Updating the last observed timestamp with the commit timestamp is
necessary to achieve consistency in the READ_YOUR_WRITES mode when
reading the data of a transaction which has just been committed. The
commit phase might take some time and may even be retried in some cases,
so even if the client observed timestamps for all the write operations
it sent in the context this transaction, the maximum timestamp collected
among the involved transaction participants might be far ahead of the
last timestamp observed by the client so far.
In addition, this patch addresses the most prominent cause of flakiness
in the recently introduced scenario
TestKuduTransaction.testTxnKeepaliveRollingSwitchToOtherTxnManager.
This patch is a follow-up to e495d6bb759fdae7cd001d86df3bae5c4f5f2b36.
Change-Id: I4177fe0d137b70bd18dd6c87eb42e8aaf03a00b3
Reviewed-on: http://gerrit.cloudera.org:8080/17356
Reviewed-by: Andrew Wong <aw...@cloudera.com>
Tested-by: Kudu Jenkins
---
.../kudu/client/GetTransactionStateRequest.java | 5 +-
.../kudu/client/GetTransactionStateResponse.java | 19 +++-
.../org/apache/kudu/client/KuduTransaction.java | 6 ++
.../apache/kudu/client/TestKuduTransaction.java | 111 ++++++++++++++++++++-
4 files changed, 138 insertions(+), 3 deletions(-)
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTransactionStateRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTransactionStateRequest.java
index be606cd..702806a 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTransactionStateRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTransactionStateRequest.java
@@ -21,6 +21,7 @@ import static org.apache.kudu.transactions.TxnManager.GetTransactionStateRespons
import java.util.Collection;
import java.util.List;
+import java.util.OptionalLong;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@@ -73,8 +74,10 @@ class GetTransactionStateRequest extends KuduRpc<GetTransactionStateResponse> {
if (!b.hasError()) {
Preconditions.checkState(b.hasState());
}
+ OptionalLong ts = b.hasCommitTimestamp() ? OptionalLong.of(b.getCommitTimestamp())
+ : OptionalLong.empty();
GetTransactionStateResponse response = new GetTransactionStateResponse(
- timeoutTracker.getElapsedMillis(), serverUUID, b.getState());
+ timeoutTracker.getElapsedMillis(), serverUUID, b.getState(), ts);
return new Pair<>(response, b.hasError() ? b.getError() : null);
}
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTransactionStateResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTransactionStateResponse.java
index 0dc576a..d410241 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTransactionStateResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTransactionStateResponse.java
@@ -17,6 +17,9 @@
package org.apache.kudu.client;
+import java.util.OptionalLong;
+
+import com.google.common.base.Preconditions;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.kudu.transactions.Transactions;
@@ -24,6 +27,7 @@ import org.apache.kudu.transactions.Transactions;
@InterfaceAudience.Private
public class GetTransactionStateResponse extends KuduRpcResponse {
private final Transactions.TxnStatePB txnState;
+ private final OptionalLong txnCommitTimestamp;
/**
* @param elapsedMillis time in milliseconds since RPC creation to now
@@ -31,15 +35,28 @@ public class GetTransactionStateResponse extends KuduRpcResponse {
* @param txnState the state of the transaction
*/
GetTransactionStateResponse(
- long elapsedMillis, String serverUUID, Transactions.TxnStatePB txnState) {
+ long elapsedMillis,
+ String serverUUID,
+ Transactions.TxnStatePB txnState,
+ OptionalLong txnCommitTimestamp) {
super(elapsedMillis, serverUUID);
this.txnState = txnState;
+ this.txnCommitTimestamp = txnCommitTimestamp;
}
public Transactions.TxnStatePB txnState() {
return txnState;
}
+ boolean hasCommitTimestamp() {
+ return txnCommitTimestamp.isPresent();
+ }
+
+ long getCommitTimestamp() {
+ Preconditions.checkState(hasCommitTimestamp());
+ return txnCommitTimestamp.getAsLong();
+ }
+
public boolean isCommitted() {
return txnState == Transactions.TxnStatePB.COMMITTED;
}
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
index 9568971..042ec69 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
@@ -313,6 +313,9 @@ public class KuduTransaction implements AutoCloseable {
Deferred<GetTransactionStateResponse> d = isTransactionCommittedAsync();
GetTransactionStateResponse resp = KuduClient.joinAndHandleException(d);
final Transactions.TxnStatePB txnState = resp.txnState();
+ if (resp.hasCommitTimestamp()) {
+ client.updateLastPropagatedTimestamp(resp.getCommitTimestamp());
+ }
switch (txnState) {
case ABORT_IN_PROGRESS:
throw new NonRecoverableException(Status.Aborted("transaction is being aborted"));
@@ -551,6 +554,9 @@ public class KuduTransaction implements AutoCloseable {
private Callback<Deferred<GetTransactionStateResponse>, GetTransactionStateResponse>
isTransactionCommittedCb(final KuduRpc<GetTransactionStateResponse> rpc) {
return resp -> {
+ if (resp.hasCommitTimestamp()) {
+ client.updateLastPropagatedTimestamp(resp.getCommitTimestamp());
+ }
// Store the Deferred locally; callback() below will reset it and we'd
// return a different, non-triggered Deferred.
Deferred<GetTransactionStateResponse> d = rpc.getDeferred();
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
index 956bb68..c8f9f2c 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
@@ -19,6 +19,7 @@ package org.apache.kudu.client;
import static org.apache.kudu.test.ClientTestUtil.countRowsInScan;
import static org.apache.kudu.test.ClientTestUtil.createBasicSchemaInsert;
+import static org.apache.kudu.test.junit.AssertHelpers.assertEventuallyTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -42,6 +43,7 @@ import org.apache.kudu.test.ClientTestUtil;
import org.apache.kudu.test.KuduTestHarness;
import org.apache.kudu.test.KuduTestHarness.MasterServerConfig;
import org.apache.kudu.test.KuduTestHarness.TabletServerConfig;
+import org.apache.kudu.test.junit.AssertHelpers.BooleanExpression;
import org.apache.kudu.transactions.Transactions.TxnTokenPB;
@@ -723,6 +725,114 @@ public class TestKuduTransaction {
}
/**
+ * This scenario validates the propagation of the commit timestamp for a
+ * multi-row transaction when committing the transaction synchronously via
+ * {@link KuduTransaction#commit(boolean wait = true)} or calling
+ * {@link KuduTransaction#isCommitComplete()} once the transaction's commit
+ * has started to run asynchronously.
+ */
+ @Test(timeout = 100000)
+ @MasterServerConfig(flags = {
+ // TxnManager functionality is necessary for this scenario.
+ "--txn_manager_enabled",
+ })
+ @TabletServerConfig(flags = {
+ // Inject latency to have a chance spotting the transaction in the
+ // FINALIZE_IN_PROGRESS state and make KuduTransaction.isCommitComplete()
+ // to return 'false' at least once before returning 'true'.
+ "--txn_status_manager_inject_latency_finalize_commit_ms=250",
+ })
+ public void testPropagateTxnCommitTimestamp() throws Exception {
+ final String TABLE_NAME = "propagate_txn_commit_timestamp";
+ client.createTable(
+ TABLE_NAME,
+ ClientTestUtil.getBasicSchema(),
+ new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 8));
+
+ KuduTable table = client.openTable(TABLE_NAME);
+
+ // Make sure the commit timestamp for a transaction is propagated to the
+ // client upon synchronously committing a transaction.
+ {
+ KuduTransaction txn = client.newTransaction();
+ KuduSession session = txn.newKuduSession();
+ session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
+
+ // Insert many rows: the goal is to get at least one row inserted into
+ // every tablet of the hash-partitioned test table, so every tablet would
+ // be a participant in the transaction, and most likely every tablet
+ // server would be involved.
+ for (int key = 0; key < 128; ++key) {
+ session.apply(createBasicSchemaInsert(table, key));
+ }
+ session.flush();
+ assertEquals(0, session.countPendingErrors());
+
+ final long tsBeforeCommit = client.getLastPropagatedTimestamp();
+ txn.commit(true /*wait*/);
+ final long tsAfterCommit = client.getLastPropagatedTimestamp();
+ assertTrue(tsAfterCommit > tsBeforeCommit);
+ }
+
+ // Make sure the commit timestamp for a transaction is propagated to the
+ // client upon calling KuduTransaction.isCommitComplete().
+ {
+ KuduTransaction txn = client.newTransaction();
+ KuduSession session = txn.newKuduSession();
+ session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
+
+ // Insert many rows: the goal is to get at least one row inserted into
+ // every tablet of the hash-partitioned test table, so every tablet would
+ // be a participant in the transaction, and most likely every tablet
+ // server would be involved.
+ for (int key = 128; key < 256; ++key) {
+ session.apply(createBasicSchemaInsert(table, key));
+ }
+ session.flush();
+ assertEquals(0, session.countPendingErrors());
+
+ final long tsBeforeCommit = client.getLastPropagatedTimestamp();
+ txn.commit(false /*wait*/);
+ assertEquals(tsBeforeCommit, client.getLastPropagatedTimestamp());
+
+ assertEventuallyTrue("commit should eventually finalize",
+ new BooleanExpression() {
+ @Override
+ public boolean get() throws Exception {
+ return txn.isCommitComplete();
+ }
+ }, 30000/*timeoutMillis*/);
+ long tsAfterCommitFinalized = client.getLastPropagatedTimestamp();
+ assertTrue(tsAfterCommitFinalized > tsBeforeCommit);
+
+ // A sanity check: calling isCommitComplete() again after the commit phase
+ // has been finalized doesn't change last propagated timestamp at the
+ // client side.
+ for (int i = 0; i < 10; ++i) {
+ assertTrue(txn.isCommitComplete());
+ assertEquals(tsAfterCommitFinalized, client.getLastPropagatedTimestamp());
+ Thread.sleep(10);
+ }
+ }
+
+ // An empty transaction doesn't have a timestamp, so there is nothing to
+ // propagate back to client when an empty transaction is committed, so the
+ // timestamp propagated to the client side should stay unchanged.
+ {
+ KuduTransaction txn = client.newTransaction();
+ final long tsBeforeCommit = client.getLastPropagatedTimestamp();
+ txn.commit(true /*wait*/);
+
+ // Just in case, linger a bit after commit has been finalized, checking
+ // for the timestamp propagated to the client side.
+ for (int i = 0; i < 10; ++i) {
+ Thread.sleep(10);
+ assertEquals(tsBeforeCommit, client.getLastPropagatedTimestamp());
+ }
+ }
+ }
+
+ /**
* Test to verify that Kudu client is able to switch to TxnManager hosted by
* other kudu-master process when the previously used one isn't available.
*/
@@ -1044,7 +1154,6 @@ public class TestKuduTransaction {
// context of the transaction.
KuduScanner scanner = new KuduScanner.KuduScannerBuilder(asyncClient, table)
.readMode(AsyncKuduScanner.ReadMode.READ_YOUR_WRITES)
- .replicaSelection(ReplicaSelection.LEADER_ONLY)
.build();
assertEquals(numMasters, countRowsInScan(scanner));
}