You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2022/02/25 09:02:08 UTC

[sling-org-apache-sling-distribution-journal] branch master updated: SLING-10077 - Mode to raise events only locally (#99)

This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new e18f2bd  SLING-10077 - Mode to raise events only locally (#99)
e18f2bd is described below

commit e18f2bd36e8b43814520e87bd4999d3ca77ce8ca
Author: balasoiuroxana <99...@users.noreply.github.com>
AuthorDate: Fri Feb 25 10:02:02 2022 +0100

    SLING-10077 - Mode to raise events only locally (#99)
    
    * Introduce a mode to raise events only locally.
      The mode is disabled by default and enabled
      via the `deduplicateEvent` property
    * Introduce a mode to ensure distributed event
      are raised at least once (tracked in SLING-10583)
      The mode is disabled by default and enabled
      via the `ensureEvent` property
---
 pom.xml                                            |  11 ++
 .../publisher/DistributedEventNotifierManager.java | 155 +++++++++++++++++++
 .../impl/publisher/PackageDistributedNotifier.java |  97 ++++++++----
 .../DistributedEventNotifierManagerTest.java       | 135 ++++++++++++++++
 .../publisher/PackageDistributedNotifierTest.java  | 171 +++++++++++++++------
 5 files changed, 495 insertions(+), 74 deletions(-)

diff --git a/pom.xml b/pom.xml
index 5b7c0e5..c218c27 100644
--- a/pom.xml
+++ b/pom.xml
@@ -148,6 +148,11 @@
             <version>0.3.0</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.discovery.api</artifactId>
+            <version>1.0.4</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.felix</groupId>
             <artifactId>org.apache.felix.webconsole</artifactId>
             <version>4.3.16</version>
@@ -249,5 +254,11 @@
             <version>1.0.13</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.discovery.impl</artifactId>
+            <version>1.1.0</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManager.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManager.java
new file mode 100644
index 0000000..bf289ae
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManager.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.publisher;
+
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEventListener;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.impl.discovery.TopologyChangeHandler;
+import org.apache.sling.distribution.journal.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.shared.Topics;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.Designate;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
+
+import java.util.Hashtable;
+
+import static org.apache.sling.commons.scheduler.Scheduler.*;
+import static org.apache.sling.discovery.TopologyEvent.Type;
+import static org.apache.sling.discovery.TopologyEvent.Type.*;
+
+@Component(immediate = true, service = {TopologyEventListener.class, Runnable.class}, property = {
+        PROPERTY_SCHEDULER_CONCURRENT + ":Boolean=false",
+        PROPERTY_SCHEDULER_IMMEDIATE + ":Boolean=true",
+        PROPERTY_SCHEDULER_PERIOD + ":Long=" + 60 * 10, // 10 minutes
+        PROPERTY_SCHEDULER_RUN_ON + "=" +  VALUE_RUN_ON_LEADER
+})
+@Designate(ocd = DistributedEventNotifierManager.Configuration.class)
+public class DistributedEventNotifierManager implements TopologyEventListener, Runnable {
+
+    /*
+     * Register the package distributed event notifier service
+     * on all or only the leader instance in a cluster according
+     * to the configuration.
+     */
+
+    @Reference
+    private EventAdmin eventAdmin;
+
+    @Reference
+    private PubQueueProvider pubQueueCacheService;
+
+    @Reference
+    private MessagingProvider messagingProvider;
+
+    @Reference
+    private Topics topics;
+
+    @Reference
+    private ResourceResolverFactory resolverFactory;
+
+    private ServiceRegistration<TopologyChangeHandler> reg;
+
+    private BundleContext context;
+
+    private Configuration config;
+
+    private PackageDistributedNotifier notifier;
+
+    @Activate
+    public void activate(BundleContext context, Configuration config) {
+        this.context = context;
+        this.config = config;
+        this.notifier = new PackageDistributedNotifier(eventAdmin, pubQueueCacheService, messagingProvider, topics, resolverFactory, config.ensureEvent());
+        if (! config.deduplicateEvent()) {
+            registerService();
+        }
+    }
+
+    @Deactivate
+    public void deactivate() {
+        unregisterService();
+    }
+
+    @Override
+    public void handleTopologyEvent(TopologyEvent event) {
+        if (config.deduplicateEvent()) {
+            Type eventType = event.getType();
+            if (eventType == TOPOLOGY_INIT || eventType == TOPOLOGY_CHANGED) {
+                if (event.getNewView().getLocalInstance().isLeader()) {
+                    registerService();
+                } else {
+                    unregisterService();
+                }
+            } else if (eventType == TOPOLOGY_CHANGING) {
+                unregisterService();
+            }
+        }
+    }
+
+    @Override
+    public void run() {
+        /*
+         * To avoid conflicting writes, only the leader instance persists the last distributed offset in the repository.
+         *
+         * The task runs at a frequency of 10 minutes to avoid overloading the author repository with a steady stream of
+         * fast commits (approximately 10 commit per second).
+         */
+        notifier.storeLastDistributedOffset();
+    }
+
+    protected boolean isLeader() {
+        return (reg != null);
+    }
+
+    private synchronized void registerService() {
+        if (reg == null) {
+            reg = context.registerService(TopologyChangeHandler.class, notifier, new Hashtable<>());
+        }
+    }
+
+    private synchronized void unregisterService() {
+        if (reg != null) {
+            reg.unregister();
+            reg = null;
+        }
+    }
+
+    @ObjectClassDefinition(name = "Apache Sling Journal based Distribution - Package Distributed Event Notifier Configuration",
+            description = "Apache Sling Content Distribution Package Distributed Event Notifier Configuration")
+    public @interface Configuration {
+
+        @AttributeDefinition(name = "Deduplicate event",
+                description = "When true the distributed event will be sent only on one instance in the cluster. " +
+                        "When false the distributed event will be sent on all instances in the cluster. Default is false")
+        boolean deduplicateEvent() default false;
+
+        @AttributeDefinition(name = "Ensure event",
+                description = "When true events will be sent from the last distributed event persisted in the repository. Default is false")
+        boolean ensureEvent() default false;
+    }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
index b3e54c3..8c5d60b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
@@ -18,14 +18,10 @@
  */
 package org.apache.sling.distribution.journal.impl.publisher;
 
-
-import java.util.Objects;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-import java.util.stream.LongStream;
-
-import javax.annotation.ParametersAreNonnullByDefault;
-
+import org.apache.commons.lang3.StringUtils;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.bookkeeper.LocalStore;
 import org.apache.sling.distribution.journal.impl.discovery.TopologyChangeHandler;
 import org.apache.sling.distribution.journal.impl.discovery.TopologyViewDiff;
 import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
@@ -34,49 +30,65 @@ import org.apache.sling.distribution.journal.queue.OffsetQueue;
 import org.apache.sling.distribution.journal.queue.PubQueueProvider;
 import org.apache.sling.distribution.journal.queue.QueueItemFactory;
 import org.apache.sling.distribution.journal.shared.Topics;
-import org.apache.sling.distribution.journal.MessagingProvider;
-
-import org.apache.commons.lang3.StringUtils;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.ParametersAreNonnullByDefault;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.LongStream;
+
+import static org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriber.escapeTopicName;
 import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS;
 import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_PATHS;
 
-@Component(immediate = true)
 @ParametersAreNonnullByDefault
 public class PackageDistributedNotifier implements TopologyChangeHandler {
 
     private static final Logger LOG = LoggerFactory.getLogger(PackageDistributedNotifier.class);
 
-    @Reference
-    private EventAdmin eventAdmin;
+    public static final String STORE_TYPE_OFFSETS = "lastRaisedEventOffset";
+
+    private final ConcurrentMap<String, Long> lastDistributedOffsets = new ConcurrentHashMap<>();
+
+    private final ConcurrentMap<String, LocalStore> localStores = new ConcurrentHashMap<>();
+
+    private final EventAdmin eventAdmin;
+
+    private final PubQueueProvider pubQueueCacheService;
 
-    @Reference
-    private PubQueueProvider pubQueueCacheService;
+    private final MessagingProvider messagingProvider;
 
-    @Reference
-    private MessagingProvider messagingProvider;
+    private final Topics topics;
 
-    @Reference
-    private Topics topics;
+    private final ResourceResolverFactory resolverFactory;
 
     private Consumer<PackageDistributedMessage> sender;
 
-    private boolean sendMsg;
+    private final boolean sendMsg;
+
+    private final boolean ensureEvent;
+
+    public PackageDistributedNotifier(EventAdmin eventAdmin, PubQueueProvider pubQueueCacheService, MessagingProvider messagingProvider, Topics topics, ResourceResolverFactory resolverFactory, boolean ensureEvent) {
+        this.eventAdmin = eventAdmin;
+        this.pubQueueCacheService = pubQueueCacheService;
+        this.messagingProvider = messagingProvider;
+        this.topics = topics;
+        this.resolverFactory = resolverFactory;
+        this.ensureEvent = ensureEvent;
 
-    @Activate
-    public void activate() {
         sendMsg = StringUtils.isNotBlank(topics.getEventTopic());
         if (sendMsg) {
             sender = messagingProvider.createSender(topics.getEventTopic());
         }
+
         LOG.info("Started package distributed notifier with event message topic {}", topics.getEventTopic());
     }
 
@@ -91,6 +103,12 @@ public class PackageDistributedNotifier implements TopologyChangeHandler {
      */
     private void processOffsets(String pubAgentName, Supplier<LongStream> offsets) {
         long minOffset = offsets.get().findFirst().getAsLong();
+
+        if (ensureEvent) {
+            long lastDistributedOffset = lastDistributedOffsets.computeIfAbsent(pubAgentName, this::getLastStoredDistributedOffset);
+            minOffset = Math.min(offsets.get().findFirst().getAsLong(), lastDistributedOffset);
+        }
+
         OffsetQueue<DistributionQueueItem> offsetQueue = pubQueueCacheService.getOffsetQueue(pubAgentName, minOffset);
         offsets
             .get()
@@ -99,6 +117,32 @@ public class PackageDistributedNotifier implements TopologyChangeHandler {
             .forEach(msg -> notifyDistributed(pubAgentName, msg));
     }
 
+    private long getLastStoredDistributedOffset(String pubAgentName) {
+        return localStores.computeIfAbsent(pubAgentName, this::newLocalStore).load(STORE_TYPE_OFFSETS, Long.MAX_VALUE);
+    }
+
+    private LocalStore newLocalStore(String pubAgentName) {
+        String packageNodeName = escapeTopicName(messagingProvider.getServerUri(), topics.getPackageTopic());
+        return new LocalStore(resolverFactory, packageNodeName, pubAgentName);
+    }
+
+    protected void storeLastDistributedOffset() {
+        for (Map.Entry<String, LocalStore> localStoreEntry: localStores.entrySet()) {
+            String pubAgentName = localStoreEntry.getKey();
+            LocalStore localStore = localStoreEntry.getValue();
+            long lastDistributedOffset = lastDistributedOffsets.getOrDefault(pubAgentName, Long.MAX_VALUE);
+            long lastStoredOffset = localStore.load(STORE_TYPE_OFFSETS, Long.MAX_VALUE);
+            if (lastDistributedOffset != lastStoredOffset) {
+                try {
+                    localStore.store(STORE_TYPE_OFFSETS, lastDistributedOffset);
+                    LOG.info("The offset={} has been stored for the pubAgentName={}", lastDistributedOffset, pubAgentName);
+                } catch (Exception  e) {
+                    LOG.warn("Exception when storing the last distributed offset in the repository", e);
+                }
+            }
+        }
+    }
+
     protected void notifyDistributed(String pubAgentName, DistributionQueueItem queueItem) {
         LOG.debug("Sending distributed notifications for pubAgentName={}, pkgId={}", pubAgentName, queueItem.getPackageId());
         sendEvt(pubAgentName, queueItem);
@@ -130,6 +174,7 @@ public class PackageDistributedNotifier implements TopologyChangeHandler {
         try {
             Event distributed = DistributionEvent.eventPackageDistributed(queueItem, pubAgentName);
             eventAdmin.sendEvent(distributed);
+            lastDistributedOffsets.put(pubAgentName, (Long)(queueItem.getOrDefault(QueueItemFactory.RECORD_OFFSET, Long.MAX_VALUE)));
         } catch (Exception e) {
             LOG.warn("Exception when sending package distributed event for pubAgentName={}, pkgId={}", pubAgentName, queueItem.getPackageId(), e);
         }
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManagerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManagerTest.java
new file mode 100644
index 0000000..c210dc1
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManagerTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.publisher;
+
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyView;
+import org.apache.sling.discovery.impl.common.DefaultInstanceDescriptionImpl;
+import org.apache.sling.discovery.impl.topology.TopologyViewImpl;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.impl.discovery.TopologyChangeHandler;
+import org.apache.sling.distribution.journal.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.shared.Topics;
+import org.apache.sling.testing.mock.osgi.MockOsgi;
+import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.event.EventAdmin;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.MockitoAnnotations.initMocks;
+import static org.osgi.util.converter.Converters.standardConverter;
+
+public class DistributedEventNotifierManagerTest {
+    @Mock
+    private EventAdmin eventAdmin;
+
+    @Mock
+    private PubQueueProvider pubQueueCacheService;
+
+    @Mock
+    private MessagingProvider messagingProvider;
+
+    @Spy
+    private Topics topics;
+
+    @Spy
+    private ResourceResolverFactory resolverFactory = new MockResourceResolverFactory();
+
+    @InjectMocks
+    private DistributedEventNotifierManager notifierManager;
+
+    private final BundleContext context = MockOsgi.newBundleContext();
+
+    private DistributedEventNotifierManager.Configuration config;
+
+    @Before
+    public void before() {
+        initMocks(this);
+    }
+
+    @Test
+    public void testConfig() {
+        Map<String, Boolean> config = new HashMap<>();
+
+        config.put("deduplicateEvent", false);
+        notifierManager.activate(context, configuration(config, DistributedEventNotifierManager.Configuration.class));
+        assertTrue(notifierManager.isLeader());
+
+        notifierManager.deactivate();
+
+        config.put("deduplicateEvent", true);
+        notifierManager.activate(context, configuration(config, DistributedEventNotifierManager.Configuration.class));
+        assertFalse(notifierManager.isLeader());
+    }
+
+    @Test
+    public void testHandleTopologyEvent() {
+        Map<String, Boolean> config = new HashMap<>();
+        config.put("deduplicateEvent", true);
+        notifierManager.activate(context, configuration(config, DistributedEventNotifierManager.Configuration.class));
+
+        TopologyView oldView = new TopologyViewImpl();
+        TopologyView newView = newViewWithInstanceDescription(true);
+
+        TopologyEvent event = new TopologyEvent(TopologyEvent.Type.PROPERTIES_CHANGED, oldView, newView);
+        notifierManager.handleTopologyEvent(event);
+        assertFalse(notifierManager.isLeader());
+
+        event = new TopologyEvent(TopologyEvent.Type.TOPOLOGY_INIT, null, newView);
+        notifierManager.handleTopologyEvent(event);
+        assertTrue(notifierManager.isLeader());
+
+        event = new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, oldView, newView);
+        notifierManager.handleTopologyEvent(event);
+        assertTrue(notifierManager.isLeader());
+
+        newView = newViewWithInstanceDescription(false);
+        event = new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, oldView, newView);
+        notifierManager.handleTopologyEvent(event);
+        assertFalse(notifierManager.isLeader());
+
+        event = new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGING, oldView, null);
+        notifierManager.handleTopologyEvent(event);
+        assertFalse(notifierManager.isLeader());
+    }
+
+    private TopologyView newViewWithInstanceDescription(boolean isLeader) {
+        InstanceDescription description = new DefaultInstanceDescriptionImpl(null, isLeader, true, "slingId", null);
+        return new TopologyViewImpl(Arrays.asList(description));
+    }
+
+    private <T> T configuration(Map<String, Boolean> props, Class<T> clazz) {
+        return standardConverter()
+                .convert(props)
+                .to(clazz);
+    }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
index 60670b5..3339859 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
@@ -18,97 +18,172 @@
  */
 package org.apache.sling.distribution.journal.impl.publisher;
 
-import static java.util.Arrays.asList;
-import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS;
-import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_PATHS;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.mockito.MockitoAnnotations.initMocks;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-
-import org.apache.sling.distribution.journal.MessageSender;
-import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.journal.*;
 import org.apache.sling.distribution.journal.impl.discovery.State;
 import org.apache.sling.distribution.journal.impl.discovery.TopologyView;
 import org.apache.sling.distribution.journal.impl.discovery.TopologyViewDiff;
 import org.apache.sling.distribution.journal.messages.PackageDistributedMessage;
-import org.apache.sling.distribution.journal.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.queue.CacheCallback;
 import org.apache.sling.distribution.journal.queue.PubQueueProvider;
-import org.apache.sling.distribution.journal.queue.QueueItemFactory;
+import org.apache.sling.distribution.journal.queue.impl.PubQueueProviderImpl;
+import org.apache.sling.distribution.journal.queue.impl.QueueErrors;
 import org.apache.sling.distribution.journal.shared.Topics;
-import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.testing.mock.osgi.MockOsgi;
+import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.Spy;
+import org.mockito.*;
+import org.osgi.framework.BundleContext;
 import org.osgi.service.event.EventAdmin;
 
+import java.io.Closeable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.HashSet;
+
+import static java.util.Arrays.asList;
+import static org.mockito.Mockito.*;
+import static org.mockito.MockitoAnnotations.initMocks;
+
 public class PackageDistributedNotifierTest {
 
+    private static final String PUB_AGENT_NAME = "publish1";
+    private static final String SUB_AGENT_NAME = "subscriber1";
+
+    @Mock
+    private EventAdmin eventAdmin;
+
     @Mock
     private PubQueueProvider pubQueueCacheService;
 
     @Mock
-    private OffsetQueue<DistributionQueueItem> offsetQueue;
+    private MessagingProvider messagingProvider;
 
     @Spy
     private Topics topics;
 
+    @Spy
+    private ResourceResolverFactory resolverFactory = new MockResourceResolverFactory();
+
     @Mock
-    private MessagingProvider messagingProvider;
+    private MessageSender<Object> sender;
+
+    @Captor
+    private ArgumentCaptor<PackageDistributedMessage> messageCaptor;
+
+    @Captor
+    private ArgumentCaptor<MessageHandler<PackageMessage>> handlerCaptor;
+
+    @Captor
+    private ArgumentCaptor<HandlerAdapter<PackageStatusMessage>> statHandlerCaptor;
 
     @Mock
-    private EventAdmin eventAdmin;
+    private CacheCallback callback;
 
-    @InjectMocks
-    private PackageDistributedNotifier notifier;
+    @Mock
+    private Closeable poller;
 
     @Mock
-    private MessageSender<Object> sender;
+    private Closeable statPoller;
 
-    @Captor
-    private ArgumentCaptor<PackageDistributedMessage> messageCaptor;
+    private final BundleContext context = MockOsgi.newBundleContext();
+
+    private MessageHandler<PackageMessage> handler;
+
+    private PubQueueProviderImpl queueProvider;
+
+    private PackageDistributedNotifier notifier;
 
     @Before
-    public void before() {
+    public void before() throws URISyntaxException {
         initMocks(this);
-        when(offsetQueue.getItem(anyLong()))
-                .thenReturn(createItem());
-        when(pubQueueCacheService.getOffsetQueue(anyString(), anyLong()))
-                .thenReturn(offsetQueue);
+        when(callback.createConsumer(handlerCaptor.capture()))
+                .thenReturn(poller);
+        when(messagingProvider.createPoller(
+                Mockito.eq(Topics.STATUS_TOPIC),
+                any(Reset.class),
+                statHandlerCaptor.capture()))
+                .thenReturn(statPoller);
+        URI serverURI = new URI("http://myserver.apache.org:1234/somepath");
+        when(messagingProvider.getServerUri()).thenReturn(serverURI);
         when(messagingProvider.createSender(Mockito.eq(topics.getEventTopic())))
             .thenReturn(sender);
+
+        QueueErrors queueErrors = mock(QueueErrors.class);
+        queueProvider = new PubQueueProviderImpl(eventAdmin, queueErrors,  callback, context);
+        handler = handlerCaptor.getValue();
+        for(int i = 0; i <= 20; i++)
+            handler.handle(info(i), packageMessage("packageid" + i, PUB_AGENT_NAME));
+
+        notifier = new PackageDistributedNotifier(eventAdmin, pubQueueCacheService, messagingProvider, topics, resolverFactory, true);
     }
 
     @Test
     public void testChanged() throws Exception {
-        notifier.activate();
         TopologyViewDiff diffView = new TopologyViewDiff(
-                buildView(new State("pub1", "sub1", 1000, 10, 0, -1, false)),
-                buildView(new State("pub1", "sub1", 2000, 13, 0, -1, false)));
+                buildView(new State(PUB_AGENT_NAME, SUB_AGENT_NAME, 1000, 10, 0, -1, false)),
+                buildView(new State(PUB_AGENT_NAME, SUB_AGENT_NAME, 2000, 13, 0, -1, false)));
+        when(pubQueueCacheService.getOffsetQueue(PUB_AGENT_NAME, 11))
+                .thenReturn(queueProvider.getOffsetQueue(PUB_AGENT_NAME, 11));
         notifier.changed(diffView);
         verify(sender, times(3)).accept(messageCaptor.capture());
     }
 
-    private DistributionQueueItem createItem() {
-        Map<String, Object> properties = new HashMap<>();
-        properties.put(QueueItemFactory.RECORD_OFFSET, 10l);
-        properties.put(PROPERTY_REQUEST_PATHS, new String[] {"/test"});
-        properties.put(PROPERTY_REQUEST_DEEP_PATHS, new String[] {"/test"});
-        return new DistributionQueueItem("packageId", properties);
+    @Test
+    public void testPersistLastRaisedOffset() throws Exception {
+        TopologyViewDiff diffView1 = new TopologyViewDiff(
+                buildView(new State(PUB_AGENT_NAME, SUB_AGENT_NAME, 1000, 10, 0, -1, false)),
+                buildView(new State(PUB_AGENT_NAME, SUB_AGENT_NAME, 2000, 13, 0, -1, false)));
+        // there is no value for the last raised offset persisted in the author repository
+        when(pubQueueCacheService.getOffsetQueue(PUB_AGENT_NAME, 11))
+                .thenReturn(queueProvider.getOffsetQueue(PUB_AGENT_NAME, 11));
+        notifier.changed(diffView1);
+        verify(sender, times(3)).accept(messageCaptor.capture());
+
+        notifier.storeLastDistributedOffset();
+
+        TopologyViewDiff diffView2 = new TopologyViewDiff(
+                buildView(new State(PUB_AGENT_NAME, SUB_AGENT_NAME, 1000, 15, 0, -1, false)),
+                buildView(new State(PUB_AGENT_NAME, SUB_AGENT_NAME, 2000, 20, 0, -1, false)));
+        // the last raised offset persisted in the author repository is 13
+        when(pubQueueCacheService.getOffsetQueue(PUB_AGENT_NAME, 13))
+                .thenReturn(queueProvider.getOffsetQueue(PUB_AGENT_NAME, 13));
+        notifier.changed(diffView2);
+        verify(sender, times(3 + 5)).accept(messageCaptor.capture());
+
+        notifier.storeLastDistributedOffset();
+
+        notifier = new PackageDistributedNotifier(eventAdmin, pubQueueCacheService, messagingProvider, topics, resolverFactory, false);
+        // the last raised offset persisted in the author repository is not considered because `ensureEvent` is disabled
+        when(pubQueueCacheService.getOffsetQueue(PUB_AGENT_NAME, 16))
+                .thenReturn(queueProvider.getOffsetQueue(PUB_AGENT_NAME, 16));
+        notifier.changed(diffView2);
+        verify(sender, times(3 + 5 + 5)).accept(messageCaptor.capture());
     }
 
     private TopologyView buildView(State ... state) {
         return new TopologyView(new HashSet<>(asList(state)));
     }
-}
\ No newline at end of file
+
+    private MessageInfo info(long offset) {
+        MessageInfo info = Mockito.mock(MessageInfo.class);
+        when(info.getOffset()).thenReturn(offset);
+        return info;
+    }
+
+    private PackageMessage packageMessage(String packageId, String pubAgentName) {
+        return PackageMessage.builder()
+                .pubAgentName(pubAgentName)
+                .pubSlingId("pub1SlingId")
+                .pkgId(packageId)
+                .reqType(PackageMessage.ReqType.ADD)
+                .pkgType("journal")
+                .paths(Collections.singletonList("path"))
+                .build();
+    }
+
+}