You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/04/22 04:19:55 UTC
[pulsar] branch master updated: [Transaction] Fix Transaction
buffer client time out. (#10206)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e9321f7 [Transaction] Fix Transaction buffer client time out. (#10206)
e9321f7 is described below
commit e9321f703bf63112819beb3513d51049767f228a
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Thu Apr 22 12:19:13 2021 +0800
[Transaction] Fix Transaction buffer client time out. (#10206)
## Motivation
now ```OpRequestSend.create()``` the timeout is nanosecond but time out unit is millisecond.
Transaction buffer client handle should lock when timeout check
## implement
1. change nanosecond to millisecond.
2. lock the transaction buffer client operation. Time out check have synchronization problem with transaction buffer client handle.
---
.../buffer/impl/TransactionBufferHandlerImpl.java | 23 +++----
.../buffer/TransactionBufferClientTest.java | 73 ++++++++++++++++++----
2 files changed, 71 insertions(+), 25 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
index 95f92d7..ed21d13a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
@@ -73,7 +73,7 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler, T
}
@Override
- public CompletableFuture<TxnID> endTxnOnTopic(String topic, long txnIdMostBits, long txnIdLeastBits,
+ public synchronized CompletableFuture<TxnID> endTxnOnTopic(String topic, long txnIdMostBits, long txnIdLeastBits,
TxnAction action, long lowWaterMark) {
CompletableFuture<TxnID> cb = new CompletableFuture<>();
if (!canSendRequest(cb)) {
@@ -104,8 +104,9 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler, T
}
@Override
- public CompletableFuture<TxnID> endTxnOnSubscription(String topic, String subscription, long txnIdMostBits,
- long txnIdLeastBits, TxnAction action, long lowWaterMark) {
+ public synchronized CompletableFuture<TxnID> endTxnOnSubscription(String topic, String subscription,
+ long txnIdMostBits, long txnIdLeastBits,
+ TxnAction action, long lowWaterMark) {
CompletableFuture<TxnID> cb = new CompletableFuture<>();
if (!canSendRequest(cb)) {
return cb;
@@ -135,7 +136,7 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler, T
}
@Override
- public void handleEndTxnOnTopicResponse(long requestId, CommandEndTxnOnPartitionResponse response) {
+ public synchronized void handleEndTxnOnTopicResponse(long requestId, CommandEndTxnOnPartitionResponse response) {
OpRequestSend op = pendingRequests.remove(requestId);
if (op == null) {
if (log.isDebugEnabled()) {
@@ -159,7 +160,7 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler, T
}
@Override
- public void handleEndTxnOnSubscriptionResponse(long requestId,
+ public synchronized void handleEndTxnOnSubscriptionResponse(long requestId,
CommandEndTxnOnSubscriptionResponse response) {
OpRequestSend op = pendingRequests.remove(requestId);
if (op == null) {
@@ -234,7 +235,7 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler, T
}
@Override
- public void run(Timeout timeout) throws Exception {
+ public synchronized void run(Timeout timeout) throws Exception {
if (timeout.isCancelled()) {
return;
}
@@ -250,17 +251,13 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler, T
break;
}
firstEntry = pendingRequests.firstEntry();
+ pendingRequests.remove(pendingRequests.firstKey());
peeked = firstEntry == null ? null : firstEntry.getValue();
}
if (peeked == null) {
timeToWaitMs = operationTimeoutInMills;
} else {
- long diff = (peeked.createdAt + operationTimeoutInMills) - System.currentTimeMillis();
- if (diff <= 0) {
- timeToWaitMs = operationTimeoutInMills;
- } else {
- timeToWaitMs = diff;
- }
+ timeToWaitMs = (peeked.createdAt + operationTimeoutInMills) - System.currentTimeMillis();
}
requestTimeout = timer.newTimeout(this, timeToWaitMs, TimeUnit.MILLISECONDS);
}
@@ -285,7 +282,7 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler, T
op.topic = topic;
op.byteBuf = byteBuf;
op.cb = cb;
- op.createdAt = System.nanoTime();
+ op.createdAt = System.currentTimeMillis();
return op;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
index 1d1a828..37ca25a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -23,6 +23,8 @@ import static org.mockito.ArgumentMatchers.anyLong;
import com.google.common.collect.Sets;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.DefaultThreadFactory;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.namespace.NamespaceService;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -32,19 +34,32 @@ import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
+import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl;
import org.apache.pulsar.broker.transaction.coordinator.TransactionMetaStoreTestBase;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
+import org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testng.Assert;
import org.testng.annotations.Test;
+import java.lang.reflect.Field;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import static org.mockito.ArgumentMatchers.anyObject;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
@Test(groups = "broker")
public class TransactionBufferClientTest extends TransactionMetaStoreTestBase {
@@ -87,18 +102,18 @@ public class TransactionBufferClientTest extends TransactionMetaStoreTestBase {
protected void afterPulsarStart() throws Exception {
brokerServices = new BrokerService[pulsarServices.length];
for (int i = 0; i < pulsarServices.length; i++) {
- Subscription mockSubscription = Mockito.mock(Subscription.class);
+ Subscription mockSubscription = mock(Subscription.class);
Mockito.when(mockSubscription.endTxn(Mockito.anyLong(),
Mockito.anyLong(), Mockito.anyInt(), Mockito.anyLong()))
.thenReturn(CompletableFuture.completedFuture(null));
- Topic mockTopic = Mockito.mock(Topic.class);
+ Topic mockTopic = mock(Topic.class);
Mockito.when(mockTopic.endTxn(any(), Mockito.anyInt(), anyLong()))
.thenReturn(CompletableFuture.completedFuture(null));
Mockito.when(mockTopic.getSubscription(any())).thenReturn(mockSubscription);
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topicMap =
- Mockito.mock(ConcurrentOpenHashMap.class);
+ mock(ConcurrentOpenHashMap.class);
Mockito.when(topicMap.get(Mockito.anyString())).thenReturn(
CompletableFuture.completedFuture(Optional.of(mockTopic)));
@@ -117,8 +132,8 @@ public class TransactionBufferClientTest extends TransactionMetaStoreTestBase {
futures.add(tbClient.commitTxnOnTopic(topic, 1L, i, Long.MIN_VALUE));
}
for (int i = 0; i < futures.size(); i++) {
- Assert.assertEquals(futures.get(i).get().getMostSigBits(), 1L);
- Assert.assertEquals(futures.get(i).get().getLeastSigBits(), i);
+ assertEquals(futures.get(i).get().getMostSigBits(), 1L);
+ assertEquals(futures.get(i).get().getLeastSigBits(), i);
}
}
@@ -130,8 +145,8 @@ public class TransactionBufferClientTest extends TransactionMetaStoreTestBase {
futures.add(tbClient.abortTxnOnTopic(topic, 1L, i, Long.MIN_VALUE));
}
for (int i = 0; i < futures.size(); i++) {
- Assert.assertEquals(futures.get(i).get().getMostSigBits(), 1L);
- Assert.assertEquals(futures.get(i).get().getLeastSigBits(), i);
+ assertEquals(futures.get(i).get().getMostSigBits(), 1L);
+ assertEquals(futures.get(i).get().getLeastSigBits(), i);
}
}
@@ -143,8 +158,8 @@ public class TransactionBufferClientTest extends TransactionMetaStoreTestBase {
futures.add(tbClient.commitTxnOnSubscription(topic, "test", 1L, i, -1L));
}
for (int i = 0; i < futures.size(); i++) {
- Assert.assertEquals(futures.get(i).get().getMostSigBits(), 1L);
- Assert.assertEquals(futures.get(i).get().getLeastSigBits(), i);
+ assertEquals(futures.get(i).get().getMostSigBits(), 1L);
+ assertEquals(futures.get(i).get().getLeastSigBits(), i);
}
}
@@ -156,8 +171,42 @@ public class TransactionBufferClientTest extends TransactionMetaStoreTestBase {
futures.add(tbClient.abortTxnOnSubscription(topic, "test", 1L, i, -1L));
}
for (int i = 0; i < futures.size(); i++) {
- Assert.assertEquals(futures.get(i).get().getMostSigBits(), 1L);
- Assert.assertEquals(futures.get(i).get().getLeastSigBits(), i);
+ assertEquals(futures.get(i).get().getMostSigBits(), 1L);
+ assertEquals(futures.get(i).get().getLeastSigBits(), i);
+ }
+ }
+
+ @Test
+ public void testTransactionBufferClientTimeout() throws Exception {
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ NamespaceService namespaceService = mock(NamespaceService.class);
+ @Cleanup("stop")
+ HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
+ doReturn(new CompletableFuture<>()).when(namespaceService).getBundleAsync(anyObject());
+ TransactionBufferHandlerImpl transactionBufferHandler = new TransactionBufferHandlerImpl(connectionPool,
+ namespaceService, hashedWheelTimer);
+ CompletableFuture<TxnID> completableFuture =
+ transactionBufferHandler.endTxnOnTopic("test", 1, 1, TxnAction.ABORT, 1);
+
+ Field field = TransactionBufferHandlerImpl.class.getDeclaredField("pendingRequests");
+ field.setAccessible(true);
+ ConcurrentSkipListMap<Long, Object> pendingRequests =
+ (ConcurrentSkipListMap<Long, Object>) field.get(transactionBufferHandler);
+
+ assertEquals(pendingRequests.size(), 1);
+
+ Awaitility.await().atLeast(2, TimeUnit.SECONDS).until(() -> {
+ if (pendingRequests.size() == 0) {
+ return true;
+ }
+ return false;
+ });
+
+ try {
+ completableFuture.get();
+ fail();
+ } catch (Exception e) {
+ assertTrue(e.getCause() instanceof TransactionBufferClientException.RequestTimeoutException);
}
}
}