You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2019/12/23 15:33:55 UTC
[sling-org-apache-sling-distribution-journal] branch master
updated: SLING-8908 - Fixing error in merge
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new 433ff00 SLING-8908 - Fixing error in merge
433ff00 is described below
commit 433ff002c07a300510199bb611bd3cafb3b5d3c5
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Mon Dec 23 16:33:01 2019 +0100
SLING-8908 - Fixing error in merge
---
.../journal/impl/subscriber/BookKeeper.java | 10 +++++++++-
.../impl/subscriber/DistributionSubscriber.java | 20 ++++++++------------
2 files changed, 17 insertions(+), 13 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
index e8136c2..4518e7b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
@@ -206,7 +206,7 @@ public class BookKeeper implements Closeable {
}
}
- public void removePackage(PackageMessage pkgMsg, long offset) throws Exception {
+ public void removePackage(PackageMessage pkgMsg, long offset) throws LoginException, PersistenceException {
log.info(format("Removing distribution package %s of type %s at offset %s", pkgMsg.getPkgId(), pkgMsg.getReqType(), offset));
Timer.Context context = distributionMetricsService.getRemovedPackageDuration().time();
try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
@@ -219,6 +219,14 @@ public class BookKeeper implements Closeable {
packageRetries.clear(pkgMsg.getPubAgentName());
context.stop();
}
+
+ public void skipPackage(long offset) throws LoginException, PersistenceException {
+ log.info(format("Skipping package at offset %s", offset));
+ try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
+ storeOffset(resolver, offset);
+ resolver.commit();
+ }
+ }
public void sendStoredStatus() throws InterruptedException, IOException {
try (Timer.Context context = distributionMetricsService.getSendStoredStatusDuration().time()) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index c77d588..13b59b9 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -46,6 +46,8 @@ import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.jackrabbit.vault.packaging.Packaging;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.commons.metrics.Timer;
@@ -299,19 +301,13 @@ public class DistributionSubscriber implements DistributionAgent {
private void handlePackageMessage(MessageInfo info, PackageMessage message) {
if (shouldEnqueue(message)) {
- try {
- DistributionQueueItem queueItem = QueueItemFactory.fromPackage(info, message, true);
- enqueue(queueItem);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException();
- }
+ DistributionQueueItem queueItem = QueueItemFactory.fromPackage(info, message, true);
+ enqueue(queueItem);
} else {
- try (ResourceResolver resolver = getServiceResolver("bookkeeper")) {
- storeOffset(resolver, info.getOffset());
- resolver.commit();
- } catch (LoginException | PersistenceException e) {
- LOG.warn("Error storing offset", e);
+ try {
+ bookKeeper.skipPackage(info.getOffset());
+ } catch (PersistenceException | LoginException e) {
+ LOG.info("Error marking message at offset {} as skipped", e);
}
}
}