You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2022/02/25 09:02:08 UTC
[sling-org-apache-sling-distribution-journal] branch master updated: SLING-10077 - Mode to raise events only locally (#99)
This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new e18f2bd SLING-10077 - Mode to raise events only locally (#99)
e18f2bd is described below
commit e18f2bd36e8b43814520e87bd4999d3ca77ce8ca
Author: balasoiuroxana <99...@users.noreply.github.com>
AuthorDate: Fri Feb 25 10:02:02 2022 +0100
SLING-10077 - Mode to raise events only locally (#99)
* Introduce a mode to raise events only locally.
The mode is disabled by default and enabled
via the `deduplicateEvent` property
* Introduce a mode to ensure distributed event
are raised at least once (tracked in SLING-10583)
The mode is disabled by default and enabled
via the `ensureEvent` property
---
pom.xml | 11 ++
.../publisher/DistributedEventNotifierManager.java | 155 +++++++++++++++++++
.../impl/publisher/PackageDistributedNotifier.java | 97 ++++++++----
.../DistributedEventNotifierManagerTest.java | 135 ++++++++++++++++
.../publisher/PackageDistributedNotifierTest.java | 171 +++++++++++++++------
5 files changed, 495 insertions(+), 74 deletions(-)
diff --git a/pom.xml b/pom.xml
index 5b7c0e5..c218c27 100644
--- a/pom.xml
+++ b/pom.xml
@@ -148,6 +148,11 @@
<version>0.3.0</version>
</dependency>
<dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.discovery.api</artifactId>
+ <version>1.0.4</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.webconsole</artifactId>
<version>4.3.16</version>
@@ -249,5 +254,11 @@
<version>1.0.13</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.discovery.impl</artifactId>
+ <version>1.1.0</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManager.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManager.java
new file mode 100644
index 0000000..bf289ae
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManager.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.publisher;
+
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEventListener;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.impl.discovery.TopologyChangeHandler;
+import org.apache.sling.distribution.journal.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.shared.Topics;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.Designate;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
+
+import java.util.Hashtable;
+
+import static org.apache.sling.commons.scheduler.Scheduler.*;
+import static org.apache.sling.discovery.TopologyEvent.Type;
+import static org.apache.sling.discovery.TopologyEvent.Type.*;
+
+@Component(immediate = true, service = {TopologyEventListener.class, Runnable.class}, property = {
+ PROPERTY_SCHEDULER_CONCURRENT + ":Boolean=false",
+ PROPERTY_SCHEDULER_IMMEDIATE + ":Boolean=true",
+ PROPERTY_SCHEDULER_PERIOD + ":Long=" + 60 * 10, // 10 minutes
+ PROPERTY_SCHEDULER_RUN_ON + "=" + VALUE_RUN_ON_LEADER
+})
+@Designate(ocd = DistributedEventNotifierManager.Configuration.class)
+public class DistributedEventNotifierManager implements TopologyEventListener, Runnable {
+
+ /*
+ * Register the package distributed event notifier service
+ * on all or only the leader instance in a cluster according
+ * to the configuration.
+ */
+
+ @Reference
+ private EventAdmin eventAdmin;
+
+ @Reference
+ private PubQueueProvider pubQueueCacheService;
+
+ @Reference
+ private MessagingProvider messagingProvider;
+
+ @Reference
+ private Topics topics;
+
+ @Reference
+ private ResourceResolverFactory resolverFactory;
+
+ private ServiceRegistration<TopologyChangeHandler> reg;
+
+ private BundleContext context;
+
+ private Configuration config;
+
+ private PackageDistributedNotifier notifier;
+
+ @Activate
+ public void activate(BundleContext context, Configuration config) {
+ this.context = context;
+ this.config = config;
+ this.notifier = new PackageDistributedNotifier(eventAdmin, pubQueueCacheService, messagingProvider, topics, resolverFactory, config.ensureEvent());
+ if (! config.deduplicateEvent()) {
+ registerService();
+ }
+ }
+
+ @Deactivate
+ public void deactivate() {
+ unregisterService();
+ }
+
+ @Override
+ public void handleTopologyEvent(TopologyEvent event) {
+ if (config.deduplicateEvent()) {
+ Type eventType = event.getType();
+ if (eventType == TOPOLOGY_INIT || eventType == TOPOLOGY_CHANGED) {
+ if (event.getNewView().getLocalInstance().isLeader()) {
+ registerService();
+ } else {
+ unregisterService();
+ }
+ } else if (eventType == TOPOLOGY_CHANGING) {
+ unregisterService();
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ /*
+ * To avoid conflicting writes, only the leader instance persists the last distributed offset in the repository.
+ *
+ * The task runs at a frequency of 10 minutes to avoid overloading the author repository with a steady stream of
+ * fast commits (approximately 10 commit per second).
+ */
+ notifier.storeLastDistributedOffset();
+ }
+
+ protected boolean isLeader() {
+ return (reg != null);
+ }
+
+ private synchronized void registerService() {
+ if (reg == null) {
+ reg = context.registerService(TopologyChangeHandler.class, notifier, new Hashtable<>());
+ }
+ }
+
+ private synchronized void unregisterService() {
+ if (reg != null) {
+ reg.unregister();
+ reg = null;
+ }
+ }
+
+ @ObjectClassDefinition(name = "Apache Sling Journal based Distribution - Package Distributed Event Notifier Configuration",
+ description = "Apache Sling Content Distribution Package Distributed Event Notifier Configuration")
+ public @interface Configuration {
+
+ @AttributeDefinition(name = "Deduplicate event",
+ description = "When true the distributed event will be sent only on one instance in the cluster. " +
+ "When false the distributed event will be sent on all instances in the cluster. Default is false")
+ boolean deduplicateEvent() default false;
+
+ @AttributeDefinition(name = "Ensure event",
+ description = "When true events will be sent from the last distributed event persisted in the repository. Default is false")
+ boolean ensureEvent() default false;
+ }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
index b3e54c3..8c5d60b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
@@ -18,14 +18,10 @@
*/
package org.apache.sling.distribution.journal.impl.publisher;
-
-import java.util.Objects;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-import java.util.stream.LongStream;
-
-import javax.annotation.ParametersAreNonnullByDefault;
-
+import org.apache.commons.lang3.StringUtils;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.bookkeeper.LocalStore;
import org.apache.sling.distribution.journal.impl.discovery.TopologyChangeHandler;
import org.apache.sling.distribution.journal.impl.discovery.TopologyViewDiff;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
@@ -34,49 +30,65 @@ import org.apache.sling.distribution.journal.queue.OffsetQueue;
import org.apache.sling.distribution.journal.queue.PubQueueProvider;
import org.apache.sling.distribution.journal.queue.QueueItemFactory;
import org.apache.sling.distribution.journal.shared.Topics;
-import org.apache.sling.distribution.journal.MessagingProvider;
-
-import org.apache.commons.lang3.StringUtils;
import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.ParametersAreNonnullByDefault;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.LongStream;
+
+import static org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriber.escapeTopicName;
import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS;
import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_PATHS;
-@Component(immediate = true)
@ParametersAreNonnullByDefault
public class PackageDistributedNotifier implements TopologyChangeHandler {
private static final Logger LOG = LoggerFactory.getLogger(PackageDistributedNotifier.class);
- @Reference
- private EventAdmin eventAdmin;
+ public static final String STORE_TYPE_OFFSETS = "lastRaisedEventOffset";
+
+ private final ConcurrentMap<String, Long> lastDistributedOffsets = new ConcurrentHashMap<>();
+
+ private final ConcurrentMap<String, LocalStore> localStores = new ConcurrentHashMap<>();
+
+ private final EventAdmin eventAdmin;
+
+ private final PubQueueProvider pubQueueCacheService;
- @Reference
- private PubQueueProvider pubQueueCacheService;
+ private final MessagingProvider messagingProvider;
- @Reference
- private MessagingProvider messagingProvider;
+ private final Topics topics;
- @Reference
- private Topics topics;
+ private final ResourceResolverFactory resolverFactory;
private Consumer<PackageDistributedMessage> sender;
- private boolean sendMsg;
+ private final boolean sendMsg;
+
+ private final boolean ensureEvent;
+
+ public PackageDistributedNotifier(EventAdmin eventAdmin, PubQueueProvider pubQueueCacheService, MessagingProvider messagingProvider, Topics topics, ResourceResolverFactory resolverFactory, boolean ensureEvent) {
+ this.eventAdmin = eventAdmin;
+ this.pubQueueCacheService = pubQueueCacheService;
+ this.messagingProvider = messagingProvider;
+ this.topics = topics;
+ this.resolverFactory = resolverFactory;
+ this.ensureEvent = ensureEvent;
- @Activate
- public void activate() {
sendMsg = StringUtils.isNotBlank(topics.getEventTopic());
if (sendMsg) {
sender = messagingProvider.createSender(topics.getEventTopic());
}
+
LOG.info("Started package distributed notifier with event message topic {}", topics.getEventTopic());
}
@@ -91,6 +103,12 @@ public class PackageDistributedNotifier implements TopologyChangeHandler {
*/
private void processOffsets(String pubAgentName, Supplier<LongStream> offsets) {
long minOffset = offsets.get().findFirst().getAsLong();
+
+ if (ensureEvent) {
+ long lastDistributedOffset = lastDistributedOffsets.computeIfAbsent(pubAgentName, this::getLastStoredDistributedOffset);
+ minOffset = Math.min(offsets.get().findFirst().getAsLong(), lastDistributedOffset);
+ }
+
OffsetQueue<DistributionQueueItem> offsetQueue = pubQueueCacheService.getOffsetQueue(pubAgentName, minOffset);
offsets
.get()
@@ -99,6 +117,32 @@ public class PackageDistributedNotifier implements TopologyChangeHandler {
.forEach(msg -> notifyDistributed(pubAgentName, msg));
}
+ private long getLastStoredDistributedOffset(String pubAgentName) {
+ return localStores.computeIfAbsent(pubAgentName, this::newLocalStore).load(STORE_TYPE_OFFSETS, Long.MAX_VALUE);
+ }
+
+ private LocalStore newLocalStore(String pubAgentName) {
+ String packageNodeName = escapeTopicName(messagingProvider.getServerUri(), topics.getPackageTopic());
+ return new LocalStore(resolverFactory, packageNodeName, pubAgentName);
+ }
+
+ protected void storeLastDistributedOffset() {
+ for (Map.Entry<String, LocalStore> localStoreEntry: localStores.entrySet()) {
+ String pubAgentName = localStoreEntry.getKey();
+ LocalStore localStore = localStoreEntry.getValue();
+ long lastDistributedOffset = lastDistributedOffsets.getOrDefault(pubAgentName, Long.MAX_VALUE);
+ long lastStoredOffset = localStore.load(STORE_TYPE_OFFSETS, Long.MAX_VALUE);
+ if (lastDistributedOffset != lastStoredOffset) {
+ try {
+ localStore.store(STORE_TYPE_OFFSETS, lastDistributedOffset);
+ LOG.info("The offset={} has been stored for the pubAgentName={}", lastDistributedOffset, pubAgentName);
+ } catch (Exception e) {
+ LOG.warn("Exception when storing the last distributed offset in the repository", e);
+ }
+ }
+ }
+ }
+
protected void notifyDistributed(String pubAgentName, DistributionQueueItem queueItem) {
LOG.debug("Sending distributed notifications for pubAgentName={}, pkgId={}", pubAgentName, queueItem.getPackageId());
sendEvt(pubAgentName, queueItem);
@@ -130,6 +174,7 @@ public class PackageDistributedNotifier implements TopologyChangeHandler {
try {
Event distributed = DistributionEvent.eventPackageDistributed(queueItem, pubAgentName);
eventAdmin.sendEvent(distributed);
+ lastDistributedOffsets.put(pubAgentName, (Long)(queueItem.getOrDefault(QueueItemFactory.RECORD_OFFSET, Long.MAX_VALUE)));
} catch (Exception e) {
LOG.warn("Exception when sending package distributed event for pubAgentName={}, pkgId={}", pubAgentName, queueItem.getPackageId(), e);
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManagerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManagerTest.java
new file mode 100644
index 0000000..c210dc1
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManagerTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.publisher;
+
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyView;
+import org.apache.sling.discovery.impl.common.DefaultInstanceDescriptionImpl;
+import org.apache.sling.discovery.impl.topology.TopologyViewImpl;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.impl.discovery.TopologyChangeHandler;
+import org.apache.sling.distribution.journal.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.shared.Topics;
+import org.apache.sling.testing.mock.osgi.MockOsgi;
+import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.event.EventAdmin;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.MockitoAnnotations.initMocks;
+import static org.osgi.util.converter.Converters.standardConverter;
+
+public class DistributedEventNotifierManagerTest {
+ @Mock
+ private EventAdmin eventAdmin;
+
+ @Mock
+ private PubQueueProvider pubQueueCacheService;
+
+ @Mock
+ private MessagingProvider messagingProvider;
+
+ @Spy
+ private Topics topics;
+
+ @Spy
+ private ResourceResolverFactory resolverFactory = new MockResourceResolverFactory();
+
+ @InjectMocks
+ private DistributedEventNotifierManager notifierManager;
+
+ private final BundleContext context = MockOsgi.newBundleContext();
+
+ private DistributedEventNotifierManager.Configuration config;
+
+ @Before
+ public void before() {
+ initMocks(this);
+ }
+
+ @Test
+ public void testConfig() {
+ Map<String, Boolean> config = new HashMap<>();
+
+ config.put("deduplicateEvent", false);
+ notifierManager.activate(context, configuration(config, DistributedEventNotifierManager.Configuration.class));
+ assertTrue(notifierManager.isLeader());
+
+ notifierManager.deactivate();
+
+ config.put("deduplicateEvent", true);
+ notifierManager.activate(context, configuration(config, DistributedEventNotifierManager.Configuration.class));
+ assertFalse(notifierManager.isLeader());
+ }
+
+ @Test
+ public void testHandleTopologyEvent() {
+ Map<String, Boolean> config = new HashMap<>();
+ config.put("deduplicateEvent", true);
+ notifierManager.activate(context, configuration(config, DistributedEventNotifierManager.Configuration.class));
+
+ TopologyView oldView = new TopologyViewImpl();
+ TopologyView newView = newViewWithInstanceDescription(true);
+
+ TopologyEvent event = new TopologyEvent(TopologyEvent.Type.PROPERTIES_CHANGED, oldView, newView);
+ notifierManager.handleTopologyEvent(event);
+ assertFalse(notifierManager.isLeader());
+
+ event = new TopologyEvent(TopologyEvent.Type.TOPOLOGY_INIT, null, newView);
+ notifierManager.handleTopologyEvent(event);
+ assertTrue(notifierManager.isLeader());
+
+ event = new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, oldView, newView);
+ notifierManager.handleTopologyEvent(event);
+ assertTrue(notifierManager.isLeader());
+
+ newView = newViewWithInstanceDescription(false);
+ event = new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, oldView, newView);
+ notifierManager.handleTopologyEvent(event);
+ assertFalse(notifierManager.isLeader());
+
+ event = new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGING, oldView, null);
+ notifierManager.handleTopologyEvent(event);
+ assertFalse(notifierManager.isLeader());
+ }
+
+ private TopologyView newViewWithInstanceDescription(boolean isLeader) {
+ InstanceDescription description = new DefaultInstanceDescriptionImpl(null, isLeader, true, "slingId", null);
+ return new TopologyViewImpl(Arrays.asList(description));
+ }
+
+ private <T> T configuration(Map<String, Boolean> props, Class<T> clazz) {
+ return standardConverter()
+ .convert(props)
+ .to(clazz);
+ }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
index 60670b5..3339859 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
@@ -18,97 +18,172 @@
*/
package org.apache.sling.distribution.journal.impl.publisher;
-import static java.util.Arrays.asList;
-import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS;
-import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_PATHS;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.mockito.MockitoAnnotations.initMocks;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-
-import org.apache.sling.distribution.journal.MessageSender;
-import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.journal.*;
import org.apache.sling.distribution.journal.impl.discovery.State;
import org.apache.sling.distribution.journal.impl.discovery.TopologyView;
import org.apache.sling.distribution.journal.impl.discovery.TopologyViewDiff;
import org.apache.sling.distribution.journal.messages.PackageDistributedMessage;
-import org.apache.sling.distribution.journal.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.queue.CacheCallback;
import org.apache.sling.distribution.journal.queue.PubQueueProvider;
-import org.apache.sling.distribution.journal.queue.QueueItemFactory;
+import org.apache.sling.distribution.journal.queue.impl.PubQueueProviderImpl;
+import org.apache.sling.distribution.journal.queue.impl.QueueErrors;
import org.apache.sling.distribution.journal.shared.Topics;
-import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.testing.mock.osgi.MockOsgi;
+import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.Spy;
+import org.mockito.*;
+import org.osgi.framework.BundleContext;
import org.osgi.service.event.EventAdmin;
+import java.io.Closeable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.HashSet;
+
+import static java.util.Arrays.asList;
+import static org.mockito.Mockito.*;
+import static org.mockito.MockitoAnnotations.initMocks;
+
public class PackageDistributedNotifierTest {
+ private static final String PUB_AGENT_NAME = "publish1";
+ private static final String SUB_AGENT_NAME = "subscriber1";
+
+ @Mock
+ private EventAdmin eventAdmin;
+
@Mock
private PubQueueProvider pubQueueCacheService;
@Mock
- private OffsetQueue<DistributionQueueItem> offsetQueue;
+ private MessagingProvider messagingProvider;
@Spy
private Topics topics;
+ @Spy
+ private ResourceResolverFactory resolverFactory = new MockResourceResolverFactory();
+
@Mock
- private MessagingProvider messagingProvider;
+ private MessageSender<Object> sender;
+
+ @Captor
+ private ArgumentCaptor<PackageDistributedMessage> messageCaptor;
+
+ @Captor
+ private ArgumentCaptor<MessageHandler<PackageMessage>> handlerCaptor;
+
+ @Captor
+ private ArgumentCaptor<HandlerAdapter<PackageStatusMessage>> statHandlerCaptor;
@Mock
- private EventAdmin eventAdmin;
+ private CacheCallback callback;
- @InjectMocks
- private PackageDistributedNotifier notifier;
+ @Mock
+ private Closeable poller;
@Mock
- private MessageSender<Object> sender;
+ private Closeable statPoller;
- @Captor
- private ArgumentCaptor<PackageDistributedMessage> messageCaptor;
+ private final BundleContext context = MockOsgi.newBundleContext();
+
+ private MessageHandler<PackageMessage> handler;
+
+ private PubQueueProviderImpl queueProvider;
+
+ private PackageDistributedNotifier notifier;
@Before
- public void before() {
+ public void before() throws URISyntaxException {
initMocks(this);
- when(offsetQueue.getItem(anyLong()))
- .thenReturn(createItem());
- when(pubQueueCacheService.getOffsetQueue(anyString(), anyLong()))
- .thenReturn(offsetQueue);
+ when(callback.createConsumer(handlerCaptor.capture()))
+ .thenReturn(poller);
+ when(messagingProvider.createPoller(
+ Mockito.eq(Topics.STATUS_TOPIC),
+ any(Reset.class),
+ statHandlerCaptor.capture()))
+ .thenReturn(statPoller);
+ URI serverURI = new URI("http://myserver.apache.org:1234/somepath");
+ when(messagingProvider.getServerUri()).thenReturn(serverURI);
when(messagingProvider.createSender(Mockito.eq(topics.getEventTopic())))
.thenReturn(sender);
+
+ QueueErrors queueErrors = mock(QueueErrors.class);
+ queueProvider = new PubQueueProviderImpl(eventAdmin, queueErrors, callback, context);
+ handler = handlerCaptor.getValue();
+ for(int i = 0; i <= 20; i++)
+ handler.handle(info(i), packageMessage("packageid" + i, PUB_AGENT_NAME));
+
+ notifier = new PackageDistributedNotifier(eventAdmin, pubQueueCacheService, messagingProvider, topics, resolverFactory, true);
}
@Test
public void testChanged() throws Exception {
- notifier.activate();
TopologyViewDiff diffView = new TopologyViewDiff(
- buildView(new State("pub1", "sub1", 1000, 10, 0, -1, false)),
- buildView(new State("pub1", "sub1", 2000, 13, 0, -1, false)));
+ buildView(new State(PUB_AGENT_NAME, SUB_AGENT_NAME, 1000, 10, 0, -1, false)),
+ buildView(new State(PUB_AGENT_NAME, SUB_AGENT_NAME, 2000, 13, 0, -1, false)));
+ when(pubQueueCacheService.getOffsetQueue(PUB_AGENT_NAME, 11))
+ .thenReturn(queueProvider.getOffsetQueue(PUB_AGENT_NAME, 11));
notifier.changed(diffView);
verify(sender, times(3)).accept(messageCaptor.capture());
}
- private DistributionQueueItem createItem() {
- Map<String, Object> properties = new HashMap<>();
- properties.put(QueueItemFactory.RECORD_OFFSET, 10l);
- properties.put(PROPERTY_REQUEST_PATHS, new String[] {"/test"});
- properties.put(PROPERTY_REQUEST_DEEP_PATHS, new String[] {"/test"});
- return new DistributionQueueItem("packageId", properties);
+ @Test
+ public void testPersistLastRaisedOffset() throws Exception {
+ TopologyViewDiff diffView1 = new TopologyViewDiff(
+ buildView(new State(PUB_AGENT_NAME, SUB_AGENT_NAME, 1000, 10, 0, -1, false)),
+ buildView(new State(PUB_AGENT_NAME, SUB_AGENT_NAME, 2000, 13, 0, -1, false)));
+ // there is no value for the last raised offset persisted in the author repository
+ when(pubQueueCacheService.getOffsetQueue(PUB_AGENT_NAME, 11))
+ .thenReturn(queueProvider.getOffsetQueue(PUB_AGENT_NAME, 11));
+ notifier.changed(diffView1);
+ verify(sender, times(3)).accept(messageCaptor.capture());
+
+ notifier.storeLastDistributedOffset();
+
+ TopologyViewDiff diffView2 = new TopologyViewDiff(
+ buildView(new State(PUB_AGENT_NAME, SUB_AGENT_NAME, 1000, 15, 0, -1, false)),
+ buildView(new State(PUB_AGENT_NAME, SUB_AGENT_NAME, 2000, 20, 0, -1, false)));
+ // the last raised offset persisted in the author repository is 13
+ when(pubQueueCacheService.getOffsetQueue(PUB_AGENT_NAME, 13))
+ .thenReturn(queueProvider.getOffsetQueue(PUB_AGENT_NAME, 13));
+ notifier.changed(diffView2);
+ verify(sender, times(3 + 5)).accept(messageCaptor.capture());
+
+ notifier.storeLastDistributedOffset();
+
+ notifier = new PackageDistributedNotifier(eventAdmin, pubQueueCacheService, messagingProvider, topics, resolverFactory, false);
+ // the last raised offset persisted in the author repository is not considered because `ensureEvent` is disabled
+ when(pubQueueCacheService.getOffsetQueue(PUB_AGENT_NAME, 16))
+ .thenReturn(queueProvider.getOffsetQueue(PUB_AGENT_NAME, 16));
+ notifier.changed(diffView2);
+ verify(sender, times(3 + 5 + 5)).accept(messageCaptor.capture());
}
private TopologyView buildView(State ... state) {
return new TopologyView(new HashSet<>(asList(state)));
}
-}
\ No newline at end of file
+
+ private MessageInfo info(long offset) {
+ MessageInfo info = Mockito.mock(MessageInfo.class);
+ when(info.getOffset()).thenReturn(offset);
+ return info;
+ }
+
+ private PackageMessage packageMessage(String packageId, String pubAgentName) {
+ return PackageMessage.builder()
+ .pubAgentName(pubAgentName)
+ .pubSlingId("pub1SlingId")
+ .pkgId(packageId)
+ .reqType(PackageMessage.ReqType.ADD)
+ .pkgType("journal")
+ .paths(Collections.singletonList("path"))
+ .build();
+ }
+
+}