You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2020/07/02 14:56:53 UTC
[sling-org-apache-sling-distribution-journal] branch master
updated: SLING-9259 - Small refactor of processQueueItem
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new 5d7fcf5 SLING-9259 - Small refactor of processQueueItem
5d7fcf5 is described below
commit 5d7fcf5a0d1e72cb887764289d07fcabcf1a91d8
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Thu Jul 2 16:39:11 2020 +0200
SLING-9259 - Small refactor of processQueueItem
---
.../impl/subscriber/DistributionSubscriber.java | 24 +++++++++++-----------
1 file changed, 12 insertions(+), 12 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index b33b22b..6b7ddd1 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -292,8 +292,8 @@ public class DistributionSubscriber {
try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) {
processQueueItem(item.get());
- } finally {
- subscriberIdle.ifPresent(SubscriberIdle::idle);
+ messageBuffer.remove();
+ distributionMetricsService.getItemsBufferSize().decrement();
}
} catch (PreConditionTimeoutException e) {
@@ -339,17 +339,17 @@ public class DistributionSubscriber {
private void processQueueItem(FullMessage<PackageMessage> item) throws PersistenceException, LoginException, DistributionException {
MessageInfo info = item.getInfo();
PackageMessage pkgMsg = item.getMessage();
- long offset = info.getOffset();
- boolean skip = shouldSkip(offset);
- subscriberIdle.ifPresent(SubscriberIdle::busy);
- if (skip) {
- bookKeeper.removePackage(pkgMsg, offset);
- } else {
- long createdTime = info.getCreateTime();
- bookKeeper.importPackage(pkgMsg, offset, createdTime);
+ boolean skip = shouldSkip(info.getOffset());
+ try {
+ subscriberIdle.ifPresent(SubscriberIdle::busy);
+ if (skip) {
+ bookKeeper.removePackage(pkgMsg, info.getOffset());
+ } else {
+ bookKeeper.importPackage(pkgMsg, info.getOffset(), info.getCreateTime());
+ }
+ } finally {
+ subscriberIdle.ifPresent(SubscriberIdle::idle);
}
- messageBuffer.remove();
- distributionMetricsService.getItemsBufferSize().decrement();
}
private boolean shouldSkip(long offset) {