You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/10/08 09:43:11 UTC

[GitHub] [pulsar] eolivelli commented on a change in pull request #12219: Transaction buffer take snapshot max read position

eolivelli commented on a change in pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#discussion_r724863461



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -308,36 +339,36 @@ private void takeSnapshotByTimeout() {
                 takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
     }
 
-    private void takeSnapshot() {
+    private CompletableFuture<Void> takeSnapshot() {
         changeMaxReadPositionAndAddAbortTimes.set(0);
-        takeSnapshotWriter.thenAccept(writer -> {
-            TransactionBufferSnapshot snapshot = new TransactionBufferSnapshot();
-            synchronized (TopicTransactionBuffer.this) {
-                snapshot.setTopicName(topic.getName());
-                snapshot.setMaxReadPositionLedgerId(maxReadPosition.getLedgerId());
-                snapshot.setMaxReadPositionEntryId(maxReadPosition.getEntryId());
-                List<AbortTxnMetadata> list = new ArrayList<>();
-                aborts.forEach((k, v) -> {
-                    AbortTxnMetadata abortTxnMetadata = new AbortTxnMetadata();
-                    abortTxnMetadata.setTxnIdMostBits(k.getMostSigBits());
-                    abortTxnMetadata.setTxnIdLeastBits(k.getLeastSigBits());
-                    abortTxnMetadata.setLedgerId(v.getLedgerId());
-                    abortTxnMetadata.setEntryId(v.getEntryId());
-                    list.add(abortTxnMetadata);
+        return takeSnapshotWriter.thenAccept(writer -> {
+                    TransactionBufferSnapshot snapshot = new TransactionBufferSnapshot();
+                    synchronized (TopicTransactionBuffer.this) {
+                        snapshot.setTopicName(topic.getName());
+                        snapshot.setMaxReadPositionLedgerId(maxReadPosition.getLedgerId());
+                        snapshot.setMaxReadPositionEntryId(maxReadPosition.getEntryId());
+                        List<AbortTxnMetadata> list = new ArrayList<>();
+                        aborts.forEach((k, v) -> {
+                            AbortTxnMetadata abortTxnMetadata = new AbortTxnMetadata();
+                            abortTxnMetadata.setTxnIdMostBits(k.getMostSigBits());
+                            abortTxnMetadata.setTxnIdLeastBits(k.getLeastSigBits());
+                            abortTxnMetadata.setLedgerId(v.getLedgerId());
+                            abortTxnMetadata.setEntryId(v.getEntryId());
+                            list.add(abortTxnMetadata);
+                        });
+                        snapshot.setAborts(list);
+                    }
+                    writer.writeAsync(snapshot).thenAccept((messageId) -> {
+                        this.lastSnapshotTimestamps = System.currentTimeMillis();
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}]Transaction buffer take snapshot success! "
+                                    + "messageId : {}", topic.getName(), messageId);
+                        }
+                    }).exceptionally(e -> {
+                        log.warn("[{}]Transaction buffer take snapshot fail! ", topic.getName(), e);
+                        return null;
+                    }).join();

Review comment:
       this is bad practice,
   we are already returning a CompletableFuture, you can use thenCombine/theCompose




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org