You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/04/24 16:11:16 UTC

[GitHub] [pulsar] eolivelli commented on a diff in pull request #15015: [enh][transaction] Optimize to reuse transaction buffer snapshot writer

eolivelli commented on code in PR #15015:
URL: https://github.com/apache/pulsar/pull/15015#discussion_r857148325


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBaseTxnBufferSnapshotService.java:
##########
@@ -18,69 +18,163 @@
  */
 package org.apache.pulsar.broker.service;
 
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.broker.systopic.SystemTopicClient.Reader;
 import org.apache.pulsar.broker.systopic.SystemTopicClient.Writer;
 import org.apache.pulsar.broker.systopic.TransactionBufferSystemTopicClient;
 import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException.InvalidTopicNameException;
-import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 
+@Slf4j
 public class SystemTopicBaseTxnBufferSnapshotService implements TransactionBufferSnapshotService {
 
-    private final Map<TopicName, SystemTopicClient<TransactionBufferSnapshot>> clients;
+    private final Map<NamespaceName, SystemTopicClient<TransactionBufferSnapshot>> clients;
 
     private final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
 
+    private final HashMap<NamespaceName, ReferenceCountedWriter> writerMap;
+    private final LinkedList<Writer<TransactionBufferSnapshot>> pendingCloseWriterList;
+
+    // The class ReferenceCountedWriter will maintain the reference count,
+    // when the reference count decrement to 0, it will be removed from writerFutureMap, the writer will be closed.
+    public static class ReferenceCountedWriter {
+
+        private final AtomicLong referenceCount;
+        private final NamespaceName namespaceName;
+        private final CompletableFuture<Writer<TransactionBufferSnapshot>> future;
+
+        public ReferenceCountedWriter(NamespaceName namespaceName,
+                                      CompletableFuture<Writer<TransactionBufferSnapshot>> future,
+                                      HashMap<NamespaceName, ReferenceCountedWriter> writerMap) {
+            this.referenceCount = new AtomicLong(1);
+            this.namespaceName = namespaceName;
+            this.future = future;
+            this.future.exceptionally(t -> {
+                log.error("[{}] Failed to create transaction buffer snapshot writer.", namespaceName, t);
+                writerMap.remove(namespaceName, this);
+                return null;
+            });
+        }
+
+        public CompletableFuture<Writer<TransactionBufferSnapshot>> getFuture() {
+            return future;
+        }
+
+        private void retain() {
+            operationValidate(true);
+            this.referenceCount.incrementAndGet();
+        }
+
+        private long release() {
+            operationValidate(false);
+            return this.referenceCount.decrementAndGet();
+        }
+
+        private void operationValidate(boolean isRetain) {
+            if (this.referenceCount.get() == 0) {
+                throw new RuntimeException(
+                        "[" + namespaceName + "] The reference counted transaction buffer snapshot writer couldn't "
+                                + "be " + (isRetain ? "retained" : "released") + ", refCnt is 0.");
+            }
+        }
+
+    }
+
     public SystemTopicBaseTxnBufferSnapshotService(PulsarClient client) {
         this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(client);
         this.clients = new ConcurrentHashMap<>();
+        this.writerMap = new HashMap<>();
+        this.pendingCloseWriterList = new LinkedList<>();
     }
 
     @Override
     public CompletableFuture<Writer<TransactionBufferSnapshot>> createWriter(TopicName topicName) {
-        return getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newWriterAsync);
+        if (topicName == null) {
+            return FutureUtil.failedFuture(
+                    new PulsarClientException.InvalidTopicNameException(
+                            "Can't create SystemTopicBaseTxnBufferSnapshotService, because the topicName is null!"));
+        }
+        return getTransactionBufferSystemTopicClient(topicName.getNamespaceObject()).newWriterAsync();
     }
 
-    private CompletableFuture<SystemTopicClient<TransactionBufferSnapshot>> getTransactionBufferSystemTopicClient(
-            TopicName topicName) {
-        TopicName systemTopicName = NamespaceEventsSystemTopicFactory
-                .getSystemTopicName(topicName.getNamespaceObject(), EventType.TRANSACTION_BUFFER_SNAPSHOT);
-        if (systemTopicName == null) {
-            return FutureUtil.failedFuture(
-                    new InvalidTopicNameException("Can't create SystemTopicBaseTxnBufferSnapshotService, "
-                            + "because the topicName is null!"));
+    @Override
+    public synchronized ReferenceCountedWriter getReferenceWriter(NamespaceName namespaceName) {
+        AtomicBoolean exitingFlag = new AtomicBoolean(false);
+        ReferenceCountedWriter referenceCountedWriter = writerMap.compute(namespaceName, (k, v) -> {
+            if (v == null) {
+                return new ReferenceCountedWriter(namespaceName,
+                        getTransactionBufferSystemTopicClient(namespaceName).newWriterAsync(), writerMap);
+            }
+            exitingFlag.set(true);
+            return v;
+        });
+        if (exitingFlag.get()) {
+            referenceCountedWriter.retain();
+        }
+        return referenceCountedWriter;
+    }
+
+    @Override
+    public synchronized void releaseReferenceWriter(ReferenceCountedWriter referenceCountedWriter) {
+        if (referenceCountedWriter.release() == 0) {
+            writerMap.remove(referenceCountedWriter.namespaceName, referenceCountedWriter);
+            referenceCountedWriter.future.thenAccept(writer -> {
+                pendingCloseWriterList.add(writer);
+                closePendingCloseWriter();
+            });
         }
-        return CompletableFuture.completedFuture(clients.computeIfAbsent(systemTopicName,
+    }
+
+    private SystemTopicClient<TransactionBufferSnapshot> getTransactionBufferSystemTopicClient(
+            NamespaceName namespaceName) {
+        return clients.computeIfAbsent(namespaceName,
                 (v) -> namespaceEventsSystemTopicFactory
-                        .createTransactionBufferSystemTopicClient(topicName.getNamespaceObject(), this)));
+                        .createTransactionBufferSystemTopicClient(namespaceName, this));
     }
 
     @Override
     public CompletableFuture<Reader<TransactionBufferSnapshot>> createReader(TopicName topicName) {
-        return getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newReaderAsync);
+        return getTransactionBufferSystemTopicClient(topicName.getNamespaceObject()).newReaderAsync();
     }
 
     @Override
     public void removeClient(TopicName topicName,
                                           TransactionBufferSystemTopicClient transactionBufferSystemTopicClient) {
         if (transactionBufferSystemTopicClient.getReaders().size() == 0
                 && transactionBufferSystemTopicClient.getWriters().size() == 0) {
-            clients.remove(topicName);
+            clients.remove(topicName.getNamespaceObject());
         }
     }
 
     @Override
     public void close() throws Exception {
-        for (Map.Entry<TopicName, SystemTopicClient<TransactionBufferSnapshot>> entry : clients.entrySet()) {
+        for (Map.Entry<NamespaceName, SystemTopicClient<TransactionBufferSnapshot>> entry : clients.entrySet()) {
             entry.getValue().close();
         }
     }
+
+    private void closePendingCloseWriter() {
+        Iterator<Writer<TransactionBufferSnapshot>> iterator = pendingCloseWriterList.stream().iterator();

Review Comment:
   Nit: why are you creating the stream?
   We can iterate on the LinkedList directly, creating less garbage  and also making the code simpler



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org