You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/04/22 04:19:55 UTC

[pulsar] branch master updated: [Transaction] Fix Transaction buffer client time out. (#10206)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e9321f7  [Transaction] Fix Transaction buffer client time out. (#10206)
e9321f7 is described below

commit e9321f703bf63112819beb3513d51049767f228a
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Thu Apr 22 12:19:13 2021 +0800

    [Transaction] Fix Transaction buffer client time out. (#10206)
    
    ## Motivation
    now ```OpRequestSend.create()``` the timeout is nanosecond but time out unit is millisecond.
    Transaction buffer client handle should lock when timeout check
    ## implement
    1. change nanosecond to millisecond.
    2. lock the transaction buffer client operation. Time out check have synchronization problem with transaction buffer client handle.
---
 .../buffer/impl/TransactionBufferHandlerImpl.java  | 23 +++----
 .../buffer/TransactionBufferClientTest.java        | 73 ++++++++++++++++++----
 2 files changed, 71 insertions(+), 25 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
index 95f92d7..ed21d13a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
@@ -73,7 +73,7 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler, T
     }
 
     @Override
-    public CompletableFuture<TxnID> endTxnOnTopic(String topic, long txnIdMostBits, long txnIdLeastBits,
+    public synchronized CompletableFuture<TxnID> endTxnOnTopic(String topic, long txnIdMostBits, long txnIdLeastBits,
                                                   TxnAction action, long lowWaterMark) {
         CompletableFuture<TxnID> cb = new CompletableFuture<>();
         if (!canSendRequest(cb)) {
@@ -104,8 +104,9 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler, T
     }
 
     @Override
-    public CompletableFuture<TxnID> endTxnOnSubscription(String topic, String subscription, long txnIdMostBits,
-                                                         long txnIdLeastBits, TxnAction action, long lowWaterMark) {
+    public synchronized CompletableFuture<TxnID> endTxnOnSubscription(String topic, String subscription,
+                                                                      long txnIdMostBits, long txnIdLeastBits,
+                                                                      TxnAction action, long lowWaterMark) {
         CompletableFuture<TxnID> cb = new CompletableFuture<>();
         if (!canSendRequest(cb)) {
             return cb;
@@ -135,7 +136,7 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler, T
     }
 
     @Override
-    public void handleEndTxnOnTopicResponse(long requestId, CommandEndTxnOnPartitionResponse response) {
+    public synchronized void handleEndTxnOnTopicResponse(long requestId, CommandEndTxnOnPartitionResponse response) {
         OpRequestSend op = pendingRequests.remove(requestId);
         if (op == null) {
             if (log.isDebugEnabled()) {
@@ -159,7 +160,7 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler, T
     }
 
     @Override
-    public void handleEndTxnOnSubscriptionResponse(long requestId,
+    public synchronized void handleEndTxnOnSubscriptionResponse(long requestId,
                                                    CommandEndTxnOnSubscriptionResponse response) {
         OpRequestSend op = pendingRequests.remove(requestId);
         if (op == null) {
@@ -234,7 +235,7 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler, T
     }
 
     @Override
-    public void run(Timeout timeout) throws Exception {
+    public synchronized void run(Timeout timeout) throws Exception {
         if (timeout.isCancelled()) {
             return;
         }
@@ -250,17 +251,13 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler, T
                 break;
             }
             firstEntry = pendingRequests.firstEntry();
+            pendingRequests.remove(pendingRequests.firstKey());
             peeked = firstEntry == null ? null : firstEntry.getValue();
         }
         if (peeked == null) {
             timeToWaitMs = operationTimeoutInMills;
         } else {
-            long diff = (peeked.createdAt + operationTimeoutInMills) - System.currentTimeMillis();
-            if (diff <= 0) {
-                timeToWaitMs = operationTimeoutInMills;
-            } else {
-                timeToWaitMs = diff;
-            }
+            timeToWaitMs = (peeked.createdAt + operationTimeoutInMills) - System.currentTimeMillis();
         }
         requestTimeout = timer.newTimeout(this, timeToWaitMs, TimeUnit.MILLISECONDS);
     }
@@ -285,7 +282,7 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler, T
             op.topic = topic;
             op.byteBuf = byteBuf;
             op.cb = cb;
-            op.createdAt = System.nanoTime();
+            op.createdAt = System.currentTimeMillis();
             return op;
         }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
index 1d1a828..37ca25a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -23,6 +23,8 @@ import static org.mockito.ArgumentMatchers.anyLong;
 import com.google.common.collect.Sets;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.namespace.NamespaceService;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
@@ -32,19 +34,32 @@ import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
+import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl;
 import org.apache.pulsar.broker.transaction.coordinator.TransactionMetaStoreTestBase;
 import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
+import org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.api.proto.TxnAction;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.awaitility.Awaitility;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testng.Assert;
 import org.testng.annotations.Test;
+import java.lang.reflect.Field;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import static org.mockito.ArgumentMatchers.anyObject;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 @Test(groups = "broker")
 public class TransactionBufferClientTest extends TransactionMetaStoreTestBase {
@@ -87,18 +102,18 @@ public class TransactionBufferClientTest extends TransactionMetaStoreTestBase {
     protected void afterPulsarStart() throws Exception {
         brokerServices = new BrokerService[pulsarServices.length];
         for (int i = 0; i < pulsarServices.length; i++) {
-            Subscription mockSubscription = Mockito.mock(Subscription.class);
+            Subscription mockSubscription = mock(Subscription.class);
             Mockito.when(mockSubscription.endTxn(Mockito.anyLong(),
                     Mockito.anyLong(), Mockito.anyInt(), Mockito.anyLong()))
                     .thenReturn(CompletableFuture.completedFuture(null));
 
-            Topic mockTopic = Mockito.mock(Topic.class);
+            Topic mockTopic = mock(Topic.class);
             Mockito.when(mockTopic.endTxn(any(), Mockito.anyInt(), anyLong()))
                     .thenReturn(CompletableFuture.completedFuture(null));
             Mockito.when(mockTopic.getSubscription(any())).thenReturn(mockSubscription);
 
             ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topicMap =
-                    Mockito.mock(ConcurrentOpenHashMap.class);
+                    mock(ConcurrentOpenHashMap.class);
             Mockito.when(topicMap.get(Mockito.anyString())).thenReturn(
                     CompletableFuture.completedFuture(Optional.of(mockTopic)));
 
@@ -117,8 +132,8 @@ public class TransactionBufferClientTest extends TransactionMetaStoreTestBase {
             futures.add(tbClient.commitTxnOnTopic(topic, 1L, i, Long.MIN_VALUE));
         }
         for (int i = 0; i < futures.size(); i++) {
-            Assert.assertEquals(futures.get(i).get().getMostSigBits(), 1L);
-            Assert.assertEquals(futures.get(i).get().getLeastSigBits(), i);
+            assertEquals(futures.get(i).get().getMostSigBits(), 1L);
+            assertEquals(futures.get(i).get().getLeastSigBits(), i);
         }
     }
 
@@ -130,8 +145,8 @@ public class TransactionBufferClientTest extends TransactionMetaStoreTestBase {
             futures.add(tbClient.abortTxnOnTopic(topic, 1L, i, Long.MIN_VALUE));
         }
         for (int i = 0; i < futures.size(); i++) {
-            Assert.assertEquals(futures.get(i).get().getMostSigBits(), 1L);
-            Assert.assertEquals(futures.get(i).get().getLeastSigBits(), i);
+            assertEquals(futures.get(i).get().getMostSigBits(), 1L);
+            assertEquals(futures.get(i).get().getLeastSigBits(), i);
         }
     }
 
@@ -143,8 +158,8 @@ public class TransactionBufferClientTest extends TransactionMetaStoreTestBase {
             futures.add(tbClient.commitTxnOnSubscription(topic, "test", 1L, i, -1L));
         }
         for (int i = 0; i < futures.size(); i++) {
-            Assert.assertEquals(futures.get(i).get().getMostSigBits(), 1L);
-            Assert.assertEquals(futures.get(i).get().getLeastSigBits(), i);
+            assertEquals(futures.get(i).get().getMostSigBits(), 1L);
+            assertEquals(futures.get(i).get().getLeastSigBits(), i);
         }
     }
 
@@ -156,8 +171,42 @@ public class TransactionBufferClientTest extends TransactionMetaStoreTestBase {
             futures.add(tbClient.abortTxnOnSubscription(topic, "test", 1L, i, -1L));
         }
         for (int i = 0; i < futures.size(); i++) {
-            Assert.assertEquals(futures.get(i).get().getMostSigBits(), 1L);
-            Assert.assertEquals(futures.get(i).get().getLeastSigBits(), i);
+            assertEquals(futures.get(i).get().getMostSigBits(), 1L);
+            assertEquals(futures.get(i).get().getLeastSigBits(), i);
+        }
+    }
+
+    @Test
+    public void testTransactionBufferClientTimeout() throws Exception {
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        NamespaceService namespaceService = mock(NamespaceService.class);
+        @Cleanup("stop")
+        HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
+        doReturn(new CompletableFuture<>()).when(namespaceService).getBundleAsync(anyObject());
+        TransactionBufferHandlerImpl transactionBufferHandler = new TransactionBufferHandlerImpl(connectionPool,
+                namespaceService, hashedWheelTimer);
+        CompletableFuture<TxnID> completableFuture =
+                transactionBufferHandler.endTxnOnTopic("test", 1, 1, TxnAction.ABORT, 1);
+
+        Field field = TransactionBufferHandlerImpl.class.getDeclaredField("pendingRequests");
+        field.setAccessible(true);
+        ConcurrentSkipListMap<Long, Object> pendingRequests =
+                (ConcurrentSkipListMap<Long, Object>) field.get(transactionBufferHandler);
+
+        assertEquals(pendingRequests.size(), 1);
+
+        Awaitility.await().atLeast(2, TimeUnit.SECONDS).until(() -> {
+            if (pendingRequests.size() == 0) {
+                return true;
+            }
+            return false;
+        });
+
+        try {
+            completableFuture.get();
+            fail();
+        } catch (Exception e) {
+            assertTrue(e.getCause() instanceof TransactionBufferClientException.RequestTimeoutException);
         }
     }
 }