You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2019/12/23 11:02:06 UTC
[sling-org-apache-sling-distribution-journal] 01/01: SLING-8908 -
Mark skipped packages as processed
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch SLING-8908
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit 8ac538d0a5cedc59a11fd045a6c0599c981fa7d8
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Mon Dec 23 11:51:43 2019 +0100
SLING-8908 - Mark skipped packages as processed
---
.../impl/subscriber/DistributionSubscriber.java | 31 +++++++++++++++-------
1 file changed, 22 insertions(+), 9 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index fcefc7f..e104a19 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -405,21 +405,34 @@ public class DistributionSubscriber implements DistributionAgent {
}
private void handlePackageMessage(MessageInfo info, PackageMessage message) {
+ if (shouldEnqueue(message)) {
+ try {
+ DistributionQueueItem queueItem = QueueItemFactory.fromPackage(info, message, true);
+ enqueue(queueItem);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException();
+ }
+ } else {
+ try (ResourceResolver resolver = getServiceResolver("bookkeeper")) {
+ storeOffset(resolver, info.getOffset());
+ resolver.commit();
+ } catch (LoginException | PersistenceException e) {
+ LOG.warn("Error storing offset", e);
+ }
+ }
+ }
+
+ private boolean shouldEnqueue(PackageMessage message) {
if (! queueNames.contains(message.getPubAgentName())) {
LOG.info(String.format("Skipping package for Publisher agent %s (not subscribed)", message.getPubAgentName()));
- return;
+ return false;
}
if (! pkgType.equals(message.getPkgType())) {
LOG.warn(String.format("Skipping package with type %s", message.getPkgType()));
- return;
- }
- DistributionQueueItem queueItem = QueueItemFactory.fromPackage(info, message, true);
- try {
- enqueue(queueItem);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException();
+ return false;
}
+ return true;
}
/**