You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/04/18 15:18:13 UTC

[GitHub] [pulsar] congbobo184 opened a new pull request #10257: Congbobo184 fix transaction buffer lookup

congbobo184 opened a new pull request #10257:
URL: https://github.com/apache/pulsar/pull/10257


   ## Motivation
   now transaction buffer client handle transaction coordinator command by find topic address and create connect. it can't init `PersistentTopic` in broker, so the command will always fail. 
   ## implement
   1. transaction buffer client handle transaction coordinator command should lookup topic once, the lookup command will init `PersistentTopic`.
   2. add a cache for the transaction buffer client.
   
   ### Verifying this change
   Add the tests for it
   
   Does this pull request potentially affect one of the following parts:
   If yes was chosen, please highlight the changes
   
   Dependencies (does it add or upgrade a dependency): (no)
   The public API: (no)
   The schema: (no)
   The default values of configurations: (no)
   The wire protocol: (no)
   The rest endpoints: (no)
   The admin cli options: (no)
   Anything that affects deployment: (no)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #10257: Fix transaction buffer lookup

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #10257:
URL: https://github.com/apache/pulsar/pull/10257


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10257: Fix transaction buffer lookup

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10257:
URL: https://github.com/apache/pulsar/pull/10257#discussion_r615565480



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
##########
@@ -18,53 +18,68 @@
  */
 package org.apache.pulsar.broker.transaction.buffer.impl;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import io.netty.buffer.ByteBuf;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Recycler;
 import io.netty.util.ReferenceCountUtil;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
-import java.net.InetSocketAddress;
-import java.net.URI;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.ClientCnx;
-import org.apache.pulsar.client.impl.ConnectionPool;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
 import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
 import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.api.proto.TxnAction;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.Commands;
-import org.apache.pulsar.common.util.FutureUtil;
 
 @Slf4j
 public class TransactionBufferHandlerImpl implements TransactionBufferHandler, TimerTask {
 
     private final ConcurrentSkipListMap<Long, OpRequestSend> pendingRequests;
-    private final ConnectionPool connectionPool;
-    private final NamespaceService namespaceService;
     private final AtomicLong requestIdGenerator = new AtomicLong();
     private final long operationTimeoutInMills;
     private Timeout requestTimeout;
     private final HashedWheelTimer timer;
     private final Semaphore semaphore;
     private final boolean blockIfReachMaxPendingOps;
+    private final PulsarClient pulsarClient;
 
-    public TransactionBufferHandlerImpl(ConnectionPool connectionPool, NamespaceService namespaceService,
+    private final LoadingCache<String, CompletableFuture<ClientCnx>> cache = CacheBuilder.newBuilder()
+            .maximumSize(100000)
+            .expireAfterAccess(30, TimeUnit.MINUTES)
+            .build(new CacheLoader<String, CompletableFuture<ClientCnx>>() {
+                @Override
+                public CompletableFuture<ClientCnx> load(String topic) {
+                    CompletableFuture<ClientCnx> siFuture = getClientCnx(topic);
+                    siFuture.whenComplete((si, cause) -> {
+                        if (null != cause) {
+                            cache.asMap().remove(topic, siFuture);

Review comment:
       it may happen that when we execute "whenComplete" are still inside the execution of this loader, for instance if  getClientCnx 
    returns a completed CompletableFuture and so probably we will encounter some bad error.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #10257: Fix transaction buffer lookup

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #10257:
URL: https://github.com/apache/pulsar/pull/10257#discussion_r618469362



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
##########
@@ -113,24 +110,38 @@ public TransactionBufferHandlerImpl(ConnectionPool connectionPool, NamespaceServ
         long requestId = requestIdGenerator.getAndIncrement();
         ByteBuf cmd = Commands.newEndTxnOnSubscription(requestId, txnIdLeastBits, txnIdMostBits,
                 topic, subscription, action, lowWaterMark);
+        return endTxn(requestId, topic, cmd, cb);
+    }
+
+    private CompletableFuture<TxnID> endTxn(long requestId, String topic, ByteBuf cmd, CompletableFuture<TxnID> cb) {
         OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb);
         pendingRequests.put(requestId, op);
         cmd.retain();
-        cnx(topic).whenComplete((clientCnx, throwable) -> {
-            if (throwable == null) {
-                try {
-                    clientCnx.ctx().writeAndFlush(cmd, clientCnx.ctx().voidPromise());
-                } catch (Exception e) {
-                    cb.completeExceptionally(e);
+        try {
+            cache.get(topic).whenComplete((clientCnx, throwable) -> {
+                if (throwable == null) {
+                    try {
+                        clientCnx.registerTransactionBufferHandler(TransactionBufferHandlerImpl.this);
+                        clientCnx.ctx().writeAndFlush(cmd, clientCnx.ctx().voidPromise());
+                    } catch (Exception e) {
+                        cache.refresh(topic);
+                        cb.completeExceptionally(e);
+                        pendingRequests.remove(requestId);
+                        op.recycle();
+                    }
+                } else {
+                    cache.refresh(topic);

Review comment:
       "invalidate" is better, i will change. good check thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #10257: Fix transaction buffer lookup

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #10257:
URL: https://github.com/apache/pulsar/pull/10257#issuecomment-825436609


   @eolivelli please review again. thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10257: Fix transaction buffer lookup

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10257:
URL: https://github.com/apache/pulsar/pull/10257#discussion_r618091050



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
##########
@@ -90,6 +90,8 @@
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;

Review comment:
       these two lines seems unrelated

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
##########
@@ -113,24 +110,38 @@ public TransactionBufferHandlerImpl(ConnectionPool connectionPool, NamespaceServ
         long requestId = requestIdGenerator.getAndIncrement();
         ByteBuf cmd = Commands.newEndTxnOnSubscription(requestId, txnIdLeastBits, txnIdMostBits,
                 topic, subscription, action, lowWaterMark);
+        return endTxn(requestId, topic, cmd, cb);
+    }
+
+    private CompletableFuture<TxnID> endTxn(long requestId, String topic, ByteBuf cmd, CompletableFuture<TxnID> cb) {
         OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb);
         pendingRequests.put(requestId, op);
         cmd.retain();
-        cnx(topic).whenComplete((clientCnx, throwable) -> {
-            if (throwable == null) {
-                try {
-                    clientCnx.ctx().writeAndFlush(cmd, clientCnx.ctx().voidPromise());
-                } catch (Exception e) {
-                    cb.completeExceptionally(e);
+        try {
+            cache.get(topic).whenComplete((clientCnx, throwable) -> {
+                if (throwable == null) {
+                    try {
+                        clientCnx.registerTransactionBufferHandler(TransactionBufferHandlerImpl.this);
+                        clientCnx.ctx().writeAndFlush(cmd, clientCnx.ctx().voidPromise());
+                    } catch (Exception e) {
+                        cache.refresh(topic);
+                        cb.completeExceptionally(e);
+                        pendingRequests.remove(requestId);
+                        op.recycle();
+                    }
+                } else {
+                    cache.refresh(topic);

Review comment:
       is it better to use "refresh" or "invalidate" here ?
   
   refresh tries to load a new value, asynchronously, if we received an error, it is probable that we will see an error during refresh as well
   https://guava.dev/releases/19.0/api/docs/com/google/common/cache/LoadingCache.html#refresh(K)
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #10257: Fix transaction buffer lookup

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #10257:
URL: https://github.com/apache/pulsar/pull/10257#discussion_r615890305



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
##########
@@ -18,53 +18,68 @@
  */
 package org.apache.pulsar.broker.transaction.buffer.impl;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import io.netty.buffer.ByteBuf;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Recycler;
 import io.netty.util.ReferenceCountUtil;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
-import java.net.InetSocketAddress;
-import java.net.URI;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.ClientCnx;
-import org.apache.pulsar.client.impl.ConnectionPool;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
 import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
 import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.api.proto.TxnAction;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.Commands;
-import org.apache.pulsar.common.util.FutureUtil;
 
 @Slf4j
 public class TransactionBufferHandlerImpl implements TransactionBufferHandler, TimerTask {
 
     private final ConcurrentSkipListMap<Long, OpRequestSend> pendingRequests;
-    private final ConnectionPool connectionPool;
-    private final NamespaceService namespaceService;
     private final AtomicLong requestIdGenerator = new AtomicLong();
     private final long operationTimeoutInMills;
     private Timeout requestTimeout;
     private final HashedWheelTimer timer;
     private final Semaphore semaphore;
     private final boolean blockIfReachMaxPendingOps;
+    private final PulsarClient pulsarClient;
 
-    public TransactionBufferHandlerImpl(ConnectionPool connectionPool, NamespaceService namespaceService,
+    private final LoadingCache<String, CompletableFuture<ClientCnx>> cache = CacheBuilder.newBuilder()
+            .maximumSize(100000)
+            .expireAfterAccess(30, TimeUnit.MINUTES)
+            .build(new CacheLoader<String, CompletableFuture<ClientCnx>>() {
+                @Override
+                public CompletableFuture<ClientCnx> load(String topic) {
+                    CompletableFuture<ClientCnx> siFuture = getClientCnx(topic);
+                    siFuture.whenComplete((si, cause) -> {
+                        if (null != cause) {
+                            cache.asMap().remove(topic, siFuture);

Review comment:
       good suggestion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10257: Fix transaction buffer lookup

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10257:
URL: https://github.com/apache/pulsar/pull/10257#discussion_r615453465



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
##########
@@ -18,53 +18,68 @@
  */
 package org.apache.pulsar.broker.transaction.buffer.impl;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import io.netty.buffer.ByteBuf;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Recycler;
 import io.netty.util.ReferenceCountUtil;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
-import java.net.InetSocketAddress;
-import java.net.URI;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.ClientCnx;
-import org.apache.pulsar.client.impl.ConnectionPool;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
 import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
 import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.api.proto.TxnAction;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.Commands;
-import org.apache.pulsar.common.util.FutureUtil;
 
 @Slf4j
 public class TransactionBufferHandlerImpl implements TransactionBufferHandler, TimerTask {
 
     private final ConcurrentSkipListMap<Long, OpRequestSend> pendingRequests;
-    private final ConnectionPool connectionPool;
-    private final NamespaceService namespaceService;
     private final AtomicLong requestIdGenerator = new AtomicLong();
     private final long operationTimeoutInMills;
     private Timeout requestTimeout;
     private final HashedWheelTimer timer;
     private final Semaphore semaphore;
     private final boolean blockIfReachMaxPendingOps;
+    private final PulsarClient pulsarClient;
 
-    public TransactionBufferHandlerImpl(ConnectionPool connectionPool, NamespaceService namespaceService,
+    private final LoadingCache<String, CompletableFuture<ClientCnx>> cache = CacheBuilder.newBuilder()
+            .maximumSize(100000)
+            .expireAfterAccess(30, TimeUnit.MINUTES)
+            .build(new CacheLoader<String, CompletableFuture<ClientCnx>>() {
+                @Override
+                public CompletableFuture<ClientCnx> load(String topic) {
+                    CompletableFuture<ClientCnx> siFuture = getClientCnx(topic);
+                    siFuture.whenComplete((si, cause) -> {
+                        if (null != cause) {
+                            cache.asMap().remove(topic, siFuture);

Review comment:
       I am not sure I understand this "remove"
   probably it is not allowed to modify the cache inside the same loader, especially for the same key




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org