You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2020/07/01 15:09:32 UTC
[sling-org-apache-sling-distribution-journal] branch master
updated: SLING-9562 - Introduce BookKeeperFactory (#48)
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new ea30838 SLING-9562 - Introduce BookKeeperFactory (#48)
ea30838 is described below
commit ea30838f9d3da13a5da030ee04cea76dc8f8a02a
Author: Christian Schneider <ch...@die-schneider.net>
AuthorDate: Wed Jul 1 17:09:22 2020 +0200
SLING-9562 - Introduce BookKeeperFactory (#48)
* SLING-9562 - Introduce BookKeeperFactory
* SLING-9562 - Fix DS setup
* SLING-9562 - Improved logging
* SLING-9562 - Fix bugs
---
.../journal/impl/subscriber/BookKeeper.java | 14 ++----
.../journal/impl/subscriber/BookKeeperConfig.java | 6 +++
.../journal/impl/subscriber/BookKeeperFactory.java | 58 ++++++++++++++++++++++
.../impl/subscriber/DistributionSubscriber.java | 19 ++-----
.../journal/impl/subscriber/BookKeeperTest.java | 5 +-
.../journal/impl/subscriber/SubscriberTest.java | 4 ++
6 files changed, 78 insertions(+), 28 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
index 49e44bb..af71132 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
@@ -31,7 +31,6 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.commons.io.IOUtils;
-import org.apache.jackrabbit.vault.packaging.Packaging;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.ResourceResolver;
@@ -46,7 +45,6 @@ import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsServ
import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
-import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
@@ -98,16 +96,13 @@ public class BookKeeper implements Closeable {
private final GaugeService<Integer> retriesGauge;
private int skippedCounter = 0;
- public BookKeeper(ResourceResolverFactory resolverFactory,
+ public BookKeeper(
+ ResourceResolverFactory resolverFactory,
DistributionMetricsService distributionMetricsService,
- Packaging packaging,
- DistributionPackageBuilder packageBuilder,
+ PackageHandler packageHandler,
EventAdmin eventAdmin,
Consumer<PackageStatusMessage> sender,
BookKeeperConfig config) {
- String pkgType = packageBuilder.getType();
- ContentPackageExtractor extractor = new ContentPackageExtractor(packaging, config.getPackageHandling());
- PackageHandler packageHandler = new PackageHandler(packageBuilder, extractor);
this.packageHandler = packageHandler;
this.eventAdmin = eventAdmin;
this.sender = sender;
@@ -121,8 +116,7 @@ public class BookKeeper implements Closeable {
this.errorQueueEnabled = (config.getMaxRetries() >= 0);
this.statusStore = new LocalStore(resolverFactory, STORE_TYPE_STATUS, config.getSubAgentName());
this.processedOffsets = new LocalStore(resolverFactory, STORE_TYPE_PACKAGE, config.getSubAgentName());
- log.info("Started bookkeeper {} with package builder {} editable {} maxRetries {}",
- config.getSubAgentName(), pkgType, config.isEditable(), config.getMaxRetries());
+ log.info("Started bookkeeper {}.", config);
}
/**
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperConfig.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperConfig.java
index 334f24f..d1355cf 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperConfig.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperConfig.java
@@ -56,4 +56,10 @@ public class BookKeeperConfig {
public PackageHandling getPackageHandling() {
return packageHandling;
}
+
+ @Override
+ public String toString() {
+ return String.format("subAgentName=%S, subSlingId=%s, editable=%s, maxRetries=%s, packageHandling=%s",
+ subAgentName, subSlingId, editable, maxRetries, packageHandling);
+ }
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperFactory.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperFactory.java
new file mode 100644
index 0000000..42ff99e
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import java.util.function.Consumer;
+
+import org.apache.jackrabbit.vault.packaging.Packaging;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
+import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.event.EventAdmin;
+
+@Component(service = BookKeeperFactory.class)
+public class BookKeeperFactory {
+ @Reference
+ private ResourceResolverFactory resolverFactory;
+
+ @Reference
+ private DistributionMetricsService distributionMetricsService;
+
+ @Reference
+ private EventAdmin eventAdmin;
+
+ @Reference
+ Packaging packaging;
+
+ public BookKeeper create(DistributionPackageBuilder packageBuilder, BookKeeperConfig config, Consumer<PackageStatusMessage> statusSender) {
+ ContentPackageExtractor extractor = new ContentPackageExtractor(packaging, config.getPackageHandling());
+ PackageHandler packageHandler = new PackageHandler(packageBuilder, extractor);
+ return new BookKeeper(
+ resolverFactory,
+ distributionMetricsService,
+ packageHandler,
+ eventAdmin,
+ statusSender,
+ config);
+ }
+
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 1999a2f..fbf0111 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -40,10 +40,8 @@ import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.jackrabbit.vault.packaging.Packaging;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
-import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.commons.metrics.Timer;
import org.apache.sling.commons.osgi.PropertiesUtil;
import org.apache.sling.distribution.agent.DistributionAgentState;
@@ -67,7 +65,6 @@ import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.event.EventAdmin;
import org.osgi.service.metatype.annotations.Designate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -94,18 +91,12 @@ public class DistributionSubscriber {
private SlingSettingsService slingSettings;
@Reference
- private ResourceResolverFactory resolverFactory;
-
- @Reference
private MessagingProvider messagingProvider;
@Reference
private Topics topics;
@Reference
- private EventAdmin eventAdmin;
-
- @Reference
private JournalAvailable journalAvailable;
@Reference(name = "precondition")
@@ -115,7 +106,7 @@ public class DistributionSubscriber {
private DistributionMetricsService distributionMetricsService;
@Reference
- private Packaging packaging;
+ BookKeeperFactory bookKeeperFactory;
@Reference
private SubscriberReadyStore subscriberReadyStore;
@@ -151,11 +142,10 @@ public class DistributionSubscriber {
requireNonNull(context);
requireNonNull(packageBuilder);
requireNonNull(slingSettings);
- requireNonNull(resolverFactory);
requireNonNull(messagingProvider);
requireNonNull(topics);
- requireNonNull(eventAdmin);
requireNonNull(precondition);
+ requireNonNull(bookKeeperFactory);
if (config.subscriberIdleCheck()) {
// Unofficial config (currently just for test)
@@ -171,8 +161,7 @@ public class DistributionSubscriber {
Consumer<PackageStatusMessage> sender = messagingProvider.createSender(topics.getStatusTopic());
BookKeeperConfig bkConfig = new BookKeeperConfig(subAgentName, subSlingId, config.editable(), config.maxRetries(), config.packageHandling());
- bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packaging, packageBuilder, eventAdmin,
- sender, bkConfig);
+ bookKeeper = bookKeeperFactory.create(packageBuilder, bkConfig, sender);
long startOffset = bookKeeper.loadOffset() + 1;
String assign = messagingProvider.assignTo(startOffset);
@@ -361,7 +350,7 @@ public class DistributionSubscriber {
}
private boolean shouldSkip(long offset) {
- boolean cleared = commandPoller.isPresent() ? commandPoller.get().isCleared(offset) : false;
+ boolean cleared = commandPoller.isPresent() && commandPoller.get().isCleared(offset);
Decision decision = waitPrecondition(offset);
return cleared || decision == Decision.SKIP;
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
index e2688a7..6bdb122 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertThat;
import java.util.function.Consumer;
-import org.apache.jackrabbit.vault.packaging.Packaging;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.ResourceResolverFactory;
@@ -57,7 +56,7 @@ public class BookKeeperTest {
private BookKeeper bookKeeper;
@Mock
- private Packaging packaging;
+ private PackageHandler packageHandler;
@Mock
private DistributionPackageBuilder packageBuilder;
@@ -65,7 +64,7 @@ public class BookKeeperTest {
@Before
public void before() {
BookKeeperConfig bkConfig = new BookKeeperConfig("subAgentName", "subSlingId", true, 10, PackageHandling.Extract);
- bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packaging, packageBuilder, eventAdmin, sender, bkConfig);
+ bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packageHandler, eventAdmin, sender, bkConfig);
}
@Test
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index 5070b7e..7dce261 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -171,6 +171,9 @@ public class SubscriberTest {
SubscriberReadyStore subscriberReadyStore = new SubscriberReadyStore();
@InjectMocks
+ BookKeeperFactory bookKeeperFactory;
+
+ @InjectMocks
DistributionSubscriber subscriber;
@Captor
@@ -388,6 +391,7 @@ public class SubscriberTest {
props.putAll(basicProps);
props.putAll(overrides);
SubscriberConfiguration config = Converters.standardConverter().convert(props).to(SubscriberConfiguration.class);
+ subscriber.bookKeeperFactory = bookKeeperFactory;
subscriber.activate(config, context, props);
packageHandler = packageCaptor.getValue().getHandler();
}