You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2020/07/01 15:09:32 UTC

[sling-org-apache-sling-distribution-journal] branch master updated: SLING-9562 - Introduce BookKeeperFactory (#48)

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new ea30838  SLING-9562 - Introduce BookKeeperFactory (#48)
ea30838 is described below

commit ea30838f9d3da13a5da030ee04cea76dc8f8a02a
Author: Christian Schneider <ch...@die-schneider.net>
AuthorDate: Wed Jul 1 17:09:22 2020 +0200

    SLING-9562 - Introduce BookKeeperFactory (#48)
    
    * SLING-9562 - Introduce BookKeeperFactory
    
    * SLING-9562 - Fix DS setup
    
    * SLING-9562 - Improved logging
    
    * SLING-9562 - Fix bugs
---
 .../journal/impl/subscriber/BookKeeper.java        | 14 ++----
 .../journal/impl/subscriber/BookKeeperConfig.java  |  6 +++
 .../journal/impl/subscriber/BookKeeperFactory.java | 58 ++++++++++++++++++++++
 .../impl/subscriber/DistributionSubscriber.java    | 19 ++-----
 .../journal/impl/subscriber/BookKeeperTest.java    |  5 +-
 .../journal/impl/subscriber/SubscriberTest.java    |  4 ++
 6 files changed, 78 insertions(+), 28 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
index 49e44bb..af71132 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
@@ -31,7 +31,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
 import org.apache.commons.io.IOUtils;
-import org.apache.jackrabbit.vault.packaging.Packaging;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.ResourceResolver;
@@ -46,7 +45,6 @@ import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsServ
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
-import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
 import org.slf4j.Logger;
@@ -98,16 +96,13 @@ public class BookKeeper implements Closeable {
     private final GaugeService<Integer> retriesGauge;
     private int skippedCounter = 0;
 
-    public BookKeeper(ResourceResolverFactory resolverFactory, 
+    public BookKeeper(
+            ResourceResolverFactory resolverFactory, 
             DistributionMetricsService distributionMetricsService,
-            Packaging packaging,
-            DistributionPackageBuilder packageBuilder,
+            PackageHandler packageHandler,
             EventAdmin eventAdmin,
             Consumer<PackageStatusMessage> sender,
             BookKeeperConfig config) { 
-        String pkgType = packageBuilder.getType();
-        ContentPackageExtractor extractor = new ContentPackageExtractor(packaging, config.getPackageHandling());
-        PackageHandler packageHandler = new PackageHandler(packageBuilder, extractor);
         this.packageHandler = packageHandler;
         this.eventAdmin = eventAdmin;
         this.sender = sender;
@@ -121,8 +116,7 @@ public class BookKeeper implements Closeable {
         this.errorQueueEnabled = (config.getMaxRetries() >= 0);
         this.statusStore = new LocalStore(resolverFactory, STORE_TYPE_STATUS, config.getSubAgentName());
         this.processedOffsets = new LocalStore(resolverFactory, STORE_TYPE_PACKAGE, config.getSubAgentName());
-        log.info("Started bookkeeper {} with package builder {} editable {} maxRetries {}",
-                config.getSubAgentName(), pkgType, config.isEditable(), config.getMaxRetries());
+        log.info("Started bookkeeper {}.", config);
     }
     
     /**
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperConfig.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperConfig.java
index 334f24f..d1355cf 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperConfig.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperConfig.java
@@ -56,4 +56,10 @@ public class BookKeeperConfig {
     public PackageHandling getPackageHandling() {
         return packageHandling;
     }
+    
+    @Override
+    public String toString() {
+        return String.format("subAgentName=%S, subSlingId=%s, editable=%s, maxRetries=%s, packageHandling=%s",
+                subAgentName, subSlingId, editable, maxRetries, packageHandling);
+    }
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperFactory.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperFactory.java
new file mode 100644
index 0000000..42ff99e
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import java.util.function.Consumer;
+
+import org.apache.jackrabbit.vault.packaging.Packaging;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
+import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.event.EventAdmin;
+
+@Component(service = BookKeeperFactory.class)
+public class BookKeeperFactory {
+    @Reference
+    private ResourceResolverFactory resolverFactory;
+    
+    @Reference
+    private DistributionMetricsService distributionMetricsService;
+    
+    @Reference
+    private EventAdmin eventAdmin;
+    
+    @Reference
+    Packaging packaging;
+
+    public BookKeeper create(DistributionPackageBuilder packageBuilder, BookKeeperConfig config, Consumer<PackageStatusMessage> statusSender) {
+        ContentPackageExtractor extractor = new ContentPackageExtractor(packaging, config.getPackageHandling());
+        PackageHandler packageHandler = new PackageHandler(packageBuilder, extractor);
+        return new BookKeeper(
+                resolverFactory, 
+                distributionMetricsService, 
+                packageHandler,
+                eventAdmin, 
+                statusSender, 
+                config);
+    }
+
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 1999a2f..fbf0111 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -40,10 +40,8 @@ import javax.annotation.ParametersAreNonnullByDefault;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.jackrabbit.vault.packaging.Packaging;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
-import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.commons.metrics.Timer;
 import org.apache.sling.commons.osgi.PropertiesUtil;
 import org.apache.sling.distribution.agent.DistributionAgentState;
@@ -67,7 +65,6 @@ import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.event.EventAdmin;
 import org.osgi.service.metatype.annotations.Designate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -94,18 +91,12 @@ public class DistributionSubscriber {
     private SlingSettingsService slingSettings;
 
     @Reference
-    private ResourceResolverFactory resolverFactory;
-
-    @Reference
     private MessagingProvider messagingProvider;
 
     @Reference
     private Topics topics;
 
     @Reference
-    private EventAdmin eventAdmin;
-
-    @Reference
     private JournalAvailable journalAvailable;
 
     @Reference(name = "precondition")
@@ -115,7 +106,7 @@ public class DistributionSubscriber {
     private DistributionMetricsService distributionMetricsService;
 
     @Reference
-    private Packaging packaging;
+    BookKeeperFactory bookKeeperFactory;
 
     @Reference
     private SubscriberReadyStore subscriberReadyStore;
@@ -151,11 +142,10 @@ public class DistributionSubscriber {
         requireNonNull(context);
         requireNonNull(packageBuilder);
         requireNonNull(slingSettings);
-        requireNonNull(resolverFactory);
         requireNonNull(messagingProvider);
         requireNonNull(topics);
-        requireNonNull(eventAdmin);
         requireNonNull(precondition);
+        requireNonNull(bookKeeperFactory);
 
         if (config.subscriberIdleCheck()) {
             // Unofficial config (currently just for test)
@@ -171,8 +161,7 @@ public class DistributionSubscriber {
 
         Consumer<PackageStatusMessage> sender = messagingProvider.createSender(topics.getStatusTopic());
         BookKeeperConfig bkConfig = new BookKeeperConfig(subAgentName, subSlingId, config.editable(), config.maxRetries(), config.packageHandling());
-        bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packaging, packageBuilder, eventAdmin,
-                sender, bkConfig);
+        bookKeeper = bookKeeperFactory.create(packageBuilder, bkConfig, sender);
         
         long startOffset = bookKeeper.loadOffset() + 1;
         String assign = messagingProvider.assignTo(startOffset);
@@ -361,7 +350,7 @@ public class DistributionSubscriber {
     }
 
     private boolean shouldSkip(long offset) {
-        boolean cleared = commandPoller.isPresent() ? commandPoller.get().isCleared(offset) : false;
+        boolean cleared = commandPoller.isPresent() && commandPoller.get().isCleared(offset);
         Decision decision = waitPrecondition(offset);
         return cleared || decision == Decision.SKIP;
     }
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
index e2688a7..6bdb122 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertThat;
 
 import java.util.function.Consumer;
 
-import org.apache.jackrabbit.vault.packaging.Packaging;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.ResourceResolverFactory;
@@ -57,7 +56,7 @@ public class BookKeeperTest {
     private BookKeeper bookKeeper;
 
     @Mock
-    private Packaging packaging;
+    private PackageHandler packageHandler;
 
     @Mock
     private DistributionPackageBuilder packageBuilder;
@@ -65,7 +64,7 @@ public class BookKeeperTest {
     @Before
     public void before() {
         BookKeeperConfig bkConfig = new BookKeeperConfig("subAgentName", "subSlingId", true, 10, PackageHandling.Extract);
-        bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packaging, packageBuilder, eventAdmin, sender, bkConfig);
+        bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packageHandler, eventAdmin, sender, bkConfig);
     }
 
     @Test
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index 5070b7e..7dce261 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -171,6 +171,9 @@ public class SubscriberTest {
     SubscriberReadyStore subscriberReadyStore = new SubscriberReadyStore();
 
     @InjectMocks
+    BookKeeperFactory bookKeeperFactory;
+
+    @InjectMocks
     DistributionSubscriber subscriber;
     
     @Captor
@@ -388,6 +391,7 @@ public class SubscriberTest {
         props.putAll(basicProps);
         props.putAll(overrides);
         SubscriberConfiguration config = Converters.standardConverter().convert(props).to(SubscriberConfiguration.class);
+        subscriber.bookKeeperFactory = bookKeeperFactory;
         subscriber.activate(config, context, props);
         packageHandler = packageCaptor.getValue().getHandler();
     }