You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2019/12/23 15:33:55 UTC

[sling-org-apache-sling-distribution-journal] branch master updated: SLING-8908 - Fixing error in merge

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new 433ff00  SLING-8908 - Fixing error in merge
433ff00 is described below

commit 433ff002c07a300510199bb611bd3cafb3b5d3c5
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Mon Dec 23 16:33:01 2019 +0100

    SLING-8908 - Fixing error in merge
---
 .../journal/impl/subscriber/BookKeeper.java          | 10 +++++++++-
 .../impl/subscriber/DistributionSubscriber.java      | 20 ++++++++------------
 2 files changed, 17 insertions(+), 13 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
index e8136c2..4518e7b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
@@ -206,7 +206,7 @@ public class BookKeeper implements Closeable {
         }
     }
 
-    public void removePackage(PackageMessage pkgMsg, long offset) throws Exception {
+    public void removePackage(PackageMessage pkgMsg, long offset) throws LoginException, PersistenceException {
         log.info(format("Removing distribution package %s of type %s at offset %s", pkgMsg.getPkgId(), pkgMsg.getReqType(), offset));
         Timer.Context context = distributionMetricsService.getRemovedPackageDuration().time();
         try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
@@ -219,6 +219,14 @@ public class BookKeeper implements Closeable {
         packageRetries.clear(pkgMsg.getPubAgentName());
         context.stop();
     }
+    
+    public void skipPackage(long offset) throws LoginException, PersistenceException {
+        log.info(format("Skipping package at offset %s", offset));
+        try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
+            storeOffset(resolver, offset);
+            resolver.commit();
+        }
+    }
 
     public void sendStoredStatus() throws InterruptedException, IOException {
         try (Timer.Context context = distributionMetricsService.getSendStoredStatusDuration().time()) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index c77d588..13b59b9 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -46,6 +46,8 @@ import javax.annotation.ParametersAreNonnullByDefault;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.jackrabbit.vault.packaging.Packaging;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.commons.metrics.Timer;
@@ -299,19 +301,13 @@ public class DistributionSubscriber implements DistributionAgent {
 
     private void handlePackageMessage(MessageInfo info, PackageMessage message) {
         if (shouldEnqueue(message)) {
-            try {
-                DistributionQueueItem queueItem = QueueItemFactory.fromPackage(info, message, true);
-                enqueue(queueItem);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new RuntimeException();
-            }
+            DistributionQueueItem queueItem = QueueItemFactory.fromPackage(info, message, true);
+            enqueue(queueItem);
         } else {
-            try (ResourceResolver resolver = getServiceResolver("bookkeeper")) {
-                storeOffset(resolver, info.getOffset());
-                resolver.commit();
-            } catch (LoginException | PersistenceException e) {
-                LOG.warn("Error storing offset", e);
+            try {
+                bookKeeper.skipPackage(info.getOffset());
+            } catch (PersistenceException | LoginException e) {
+                LOG.info("Error marking message at offset {} as skipped", e);
             }
         }
     }