You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2020/07/02 14:56:53 UTC

[sling-org-apache-sling-distribution-journal] branch master updated: SLING-9259 - Small refactor of processQueueItem

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d7fcf5  SLING-9259 - Small refactor of processQueueItem
5d7fcf5 is described below

commit 5d7fcf5a0d1e72cb887764289d07fcabcf1a91d8
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Thu Jul 2 16:39:11 2020 +0200

    SLING-9259 - Small refactor of processQueueItem
---
 .../impl/subscriber/DistributionSubscriber.java    | 24 +++++++++++-----------
 1 file changed, 12 insertions(+), 12 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index b33b22b..6b7ddd1 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -292,8 +292,8 @@ public class DistributionSubscriber {
 
             try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) {
                 processQueueItem(item.get());
-            } finally {
-                subscriberIdle.ifPresent(SubscriberIdle::idle);
+                messageBuffer.remove();
+                distributionMetricsService.getItemsBufferSize().decrement();
             }
 
         } catch (PreConditionTimeoutException e) {
@@ -339,17 +339,17 @@ public class DistributionSubscriber {
     private void processQueueItem(FullMessage<PackageMessage> item) throws PersistenceException, LoginException, DistributionException {
         MessageInfo info = item.getInfo();
         PackageMessage pkgMsg = item.getMessage();
-        long offset = info.getOffset();
-        boolean skip = shouldSkip(offset);
-        subscriberIdle.ifPresent(SubscriberIdle::busy);
-        if (skip) {
-            bookKeeper.removePackage(pkgMsg, offset);
-        } else {
-            long createdTime = info.getCreateTime();
-            bookKeeper.importPackage(pkgMsg, offset, createdTime);
+        boolean skip = shouldSkip(info.getOffset());
+        try {
+            subscriberIdle.ifPresent(SubscriberIdle::busy);
+            if (skip) {
+                bookKeeper.removePackage(pkgMsg, info.getOffset());
+            } else {
+                bookKeeper.importPackage(pkgMsg, info.getOffset(), info.getCreateTime());
+            }
+        } finally {
+            subscriberIdle.ifPresent(SubscriberIdle::idle);
         }
-        messageBuffer.remove();
-        distributionMetricsService.getItemsBufferSize().decrement();
     }
 
     private boolean shouldSkip(long offset) {