You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2019/12/23 13:58:12 UTC

[sling-org-apache-sling-distribution-journal] branch master updated: SLING-8908 - Mark skipped packages as processed (#20)

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f4364e  SLING-8908 - Mark skipped packages as processed (#20)
0f4364e is described below

commit 0f4364e41e69103a078e5ae69b95b61ff5cbda4c
Author: Christian Schneider <ch...@die-schneider.net>
AuthorDate: Mon Dec 23 14:58:06 2019 +0100

    SLING-8908 - Mark skipped packages as processed (#20)
---
 .../impl/subscriber/DistributionSubscriber.java    | 26 ++++++++++++++++++----
 1 file changed, 22 insertions(+), 4 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 9dd0d70..c77d588 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -298,16 +298,34 @@ public class DistributionSubscriber implements DistributionAgent {
     }
 
     private void handlePackageMessage(MessageInfo info, PackageMessage message) {
+        if (shouldEnqueue(message)) {
+            try {
+                DistributionQueueItem queueItem = QueueItemFactory.fromPackage(info, message, true);
+                enqueue(queueItem);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException();
+            }
+        } else {
+            try (ResourceResolver resolver = getServiceResolver("bookkeeper")) {
+                storeOffset(resolver, info.getOffset());
+                resolver.commit();
+            } catch (LoginException | PersistenceException e) {
+                LOG.warn("Error storing offset", e);
+            }
+        }
+    }
+
+    private boolean shouldEnqueue(PackageMessage message) {
         if (!queueNames.contains(message.getPubAgentName())) {
             LOG.info(String.format("Skipping package for Publisher agent %s (not subscribed)", message.getPubAgentName()));
-            return;
+            return false;
         }
         if (!pkgType.equals(message.getPkgType())) {
             LOG.warn(String.format("Skipping package with type %s", message.getPkgType()));
-            return;
+            return false;
         }
-        DistributionQueueItem queueItem = QueueItemFactory.fromPackage(info, message, true);
-        enqueue(queueItem);
+        return true;
     }
 
     /**