You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2020/07/23 09:51:03 UTC
[sling-org-apache-sling-distribution-journal] branch master
updated: Sling 9583 - Make queue code usable for other bundles (#53)
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new 2efca59 Sling 9583 - Make queue code usable for other bundles (#53)
2efca59 is described below
commit 2efca59b3348175be9f9ff8c39c9300cab9d03e9
Author: Christian Schneider <ch...@die-schneider.net>
AuthorDate: Thu Jul 23 11:50:55 2020 +0200
Sling 9583 - Make queue code usable for other bundles (#53)
* SLING-9577 - Switch to seeding thread
* SLING-9577 - Remove unnecessar parameters
* SLING-9577 - Remove unnecessary logging
* SLING-9577 - Add count so we can monitor numnber of seeding messages
* SLING-9577 - Start counting with 1
* SLING-9577 - Limit maximum number of messages sent on seeder thread
* SLING-9577 - Use exponential backoff and unlimited messages on seeder thread
* SLING-9583 - Delete SubQueue classes as they are not used anymore
* SLING-9583 - Combine parameters into value object
* SLING-9583 - Extract messaging code from PubQueueProviderImpl
* SLING-9583 - Extract messaging code from PubQueueCache
* SLING-9583 - Cleanup
* SLING-9583 - Integrate PubQueueCacheService into PubQueueProviderImpl
* SLING-9583 - checkstyle
* SLING-9583 - ignore sonar warnings
* SLING-9583 - ignore sonar warnings
* SLING-9583 - Increase coverage
* SLING-9583 - Adding test
* SLING-9583 - Move discovery into separate package
* SLING-9583 - Create PubQueueProvider using factory
* SLING-9583 - Sonar
* SLING-9583 - Remove QueueId
* SLING-9583 - Move all queue logic to PubQueueProvider
* SLING-9583 - Publish single PubQueueProvider service for compatibility
* SLING-9583 - Fix null annotation
* SLING-9583 - Fix null annotation
* SLING-9583 - Make queue API independent from discovery
* SLING-9583 - Improve readability of test
* SLING-9583 - Move queue package out of impl as it is exported
* SLING-9583 - Fix sonar issues
---
pom.xml | 2 +-
.../{publisher => discovery}/DiscoveryService.java | 40 ++--
.../impl/{publisher => discovery}/State.java | 2 +-
.../TopologyChangeHandler.java | 2 +-
.../{publisher => discovery}/TopologyView.java | 2 +-
.../{publisher => discovery}/TopologyViewDiff.java | 2 +-
.../TopologyViewManager.java | 4 +-
.../journal/impl/publisher/DistPublisherJMX.java | 2 +
.../impl/publisher/DistributionPublisher.java | 84 +++------
.../impl/publisher/MessagingCacheCallback.java | 122 ++++++++++++
.../impl/publisher/PackageDistributedNotifier.java | 44 +++--
.../impl/publisher/PubQueueProviderPublisher.java | 88 +++++++++
.../impl => publisher}/QueueCacheSeeder.java | 2 +-
.../{queue/impl => publisher}/RangePoller.java | 8 +-
.../impl/queue/impl/PubQueueCacheService.java | 138 --------------
.../impl/queue/impl/PubQueueProviderImpl.java | 155 ----------------
.../impl/queue/impl/QueueCacheCleanupTask.java | 56 ------
.../journal/impl/queue/impl/SubQueue.java | 165 -----------------
.../CacheCallback.java} | 18 +-
.../{impl/queue/impl => queue}/ClearCallback.java | 2 +-
.../journal/{impl => }/queue/OffsetQueue.java | 2 +-
.../journal/{impl => }/queue/PubQueueProvider.java | 24 ++-
.../PubQueueProviderFactory.java} | 18 +-
.../journal/{impl => }/queue/QueueItemFactory.java | 2 +-
.../distribution/journal/queue/QueueState.java | 50 +++++
.../journal/{impl => }/queue/impl/EntryUtil.java | 4 +-
.../{impl => }/queue/impl/OffsetQueueImpl.java | 4 +-
.../queue/impl/OffsetQueueImplMBean.java | 2 +-
.../journal/{impl => }/queue/impl/PubErrQueue.java | 4 +-
.../journal/{impl => }/queue/impl/PubQueue.java | 5 +-
.../{impl => }/queue/impl/PubQueueCache.java | 56 ++----
.../queue/impl/PubQueueProviderFactoryImpl.java | 46 +++++
.../journal/queue/impl/PubQueueProviderImpl.java | 204 +++++++++++++++++++++
.../{impl => }/queue/impl/QueueEntryFactory.java | 4 +-
.../package-info.java} | 10 +-
.../{impl/publisher => shared}/AgentId.java | 2 +-
.../DiscoveryServiceTest.java | 2 +-
.../impl/{publisher => discovery}/StateTest.java | 2 +-
.../TopologyViewDiffTest.java | 2 +-
.../TopologyViewManagerTest.java | 2 +-
.../{publisher => discovery}/TopologyViewTest.java | 3 +-
.../impl/publisher/DistPublisherJMXTest.java | 3 +
.../impl/publisher/DistributionPublisherTest.java | 52 +++---
.../impl/publisher/MessagingCacheCallbackTest.java | 184 +++++++++++++++++++
.../publisher/PackageDistributedNotifierTest.java | 54 ++++--
.../publisher/PubQueueProviderPublisherTest.java | 75 ++++++++
.../impl => publisher}/QueueCacheSeederTest.java | 2 +-
.../{queue/impl => publisher}/RangePollerTest.java | 4 +-
.../journal/impl/queue/impl/SubQueueTest.java | 73 --------
.../journal/impl/subscriber/SubscriberTest.java | 61 +++---
.../{impl => }/queue/QueueItemFactoryTest.java | 10 +-
.../{impl => }/queue/impl/EntryUtilTest.java | 2 +-
.../queue/impl/OffsetQueueImplJMXTest.java | 4 +-
.../{impl => }/queue/impl/OffsetQueueImplTest.java | 4 +-
.../{impl => }/queue/impl/PubQueueCacheTest.java | 119 ++++--------
.../queue/impl/PubQueueProviderTest.java | 90 +++++----
.../{impl => }/queue/impl/PubQueueTest.java | 12 +-
.../{impl/publisher => shared}/AgentIdTest.java | 2 +-
58 files changed, 1138 insertions(+), 999 deletions(-)
diff --git a/pom.xml b/pom.xml
index 1be3cbc..a433d58 100644
--- a/pom.xml
+++ b/pom.xml
@@ -90,7 +90,7 @@
<dependency>
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.converter</artifactId>
- <version>1.0.0</version>
+ <version>1.0.14</version>
<scope>test</scope>
</dependency>
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
similarity index 79%
rename from src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
index a88770e..35a5024 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.impl.discovery;
import static java.lang.String.format;
import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
@@ -32,6 +32,7 @@ import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.sling.distribution.journal.messages.DiscoveryMessage;
import org.apache.sling.distribution.journal.messages.SubscriberConfig;
import org.apache.sling.distribution.journal.messages.SubscriberState;
+import org.apache.sling.distribution.journal.shared.AgentId;
import org.apache.sling.distribution.journal.shared.PublisherConfigurationAvailable;
import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.commons.io.IOUtils;
@@ -41,13 +42,12 @@ import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
-
-import org.apache.sling.distribution.journal.MessageHandler;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicyOption;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.JournalAvailable;
import org.apache.sling.distribution.journal.Reset;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,10 +79,10 @@ public class DiscoveryService implements Runnable {
@Reference
private Topics topics;
- @Reference
- private TopologyChangeHandler topologyChangeHandler;
+ @Reference(policyOption = ReferencePolicyOption.GREEDY, cardinality = ReferenceCardinality.OPTIONAL)
+ private volatile TopologyChangeHandler topologyChangeHandler; //NOSONAR
- private volatile ServiceRegistration<?> reg;
+ private volatile ServiceRegistration<?> reg; //NOSONAR
private final TopologyViewManager viewManager = new TopologyViewManager(REFRESH_TTL_MS);
@@ -105,7 +105,7 @@ public class DiscoveryService implements Runnable {
poller = messagingProvider.createPoller(
topics.getDiscoveryTopic(),
Reset.latest,
- create(DiscoveryMessage.class, new DiscoveryMessageHandler())
+ create(DiscoveryMessage.class, this::handleDiscovery)
);
startTopologyViewUpdaterTask(context);
LOG.info("Discovery service started");
@@ -140,7 +140,10 @@ public class DiscoveryService implements Runnable {
} else {
LOG.debug(msg);
}
- topologyChangeHandler.changed(diffView);
+ TopologyChangeHandler handler = topologyChangeHandler;
+ if (handler != null) {
+ handler.changed(diffView);
+ }
}
}
@@ -152,18 +155,13 @@ public class DiscoveryService implements Runnable {
reg = context.registerService(Runnable.class.getName(), this, props);
}
- private final class DiscoveryMessageHandler implements MessageHandler<DiscoveryMessage> {
-
- @Override
- public void handle(MessageInfo info, DiscoveryMessage disMsg) {
-
- long now = System.currentTimeMillis();
- AgentId subAgentId = new AgentId(disMsg.getSubSlingId(), disMsg.getSubAgentName());
- for (SubscriberState subStateMsg : disMsg.getSubscriberStates()) {
- SubscriberConfig subConfig = disMsg.getSubscriberConfiguration();
- State subState = new State(subStateMsg.getPubAgentName(), subAgentId.getAgentId(), now, subStateMsg.getOffset(), subStateMsg.getRetries(), subConfig.getMaxRetries(), subConfig.isEditable());
- viewManager.refreshState(subState);
- }
+ public void handleDiscovery(MessageInfo info, DiscoveryMessage disMsg) {
+ long now = System.currentTimeMillis();
+ AgentId subAgentId = new AgentId(disMsg.getSubSlingId(), disMsg.getSubAgentName());
+ for (SubscriberState subStateMsg : disMsg.getSubscriberStates()) {
+ SubscriberConfig subConfig = disMsg.getSubscriberConfiguration();
+ State subState = new State(subStateMsg.getPubAgentName(), subAgentId.getAgentId(), now, subStateMsg.getOffset(), subStateMsg.getRetries(), subConfig.getMaxRetries(), subConfig.isEditable());
+ viewManager.refreshState(subState);
}
}
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/State.java b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/State.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/publisher/State.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/discovery/State.java
index be649c4..bf007c6 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/State.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/State.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.impl.discovery;
import java.util.Objects;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/TopologyChangeHandler.java b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/TopologyChangeHandler.java
similarity index 94%
rename from src/main/java/org/apache/sling/distribution/journal/impl/publisher/TopologyChangeHandler.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/discovery/TopologyChangeHandler.java
index aeb70c7..0a82791 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/TopologyChangeHandler.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/TopologyChangeHandler.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.impl.discovery;
import javax.annotation.ParametersAreNonnullByDefault;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/TopologyView.java b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/TopologyView.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/publisher/TopologyView.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/discovery/TopologyView.java
index bdf4d34..ba7718d 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/TopologyView.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/TopologyView.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.impl.discovery;
import static java.util.Collections.unmodifiableSet;
import static java.util.stream.Collectors.groupingBy;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/TopologyViewDiff.java b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/TopologyViewDiff.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/publisher/TopologyViewDiff.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/discovery/TopologyViewDiff.java
index 235982c..1647795 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/TopologyViewDiff.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/TopologyViewDiff.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.impl.discovery;
import java.util.HashSet;
import java.util.Map;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/TopologyViewManager.java b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/TopologyViewManager.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/impl/publisher/TopologyViewManager.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/discovery/TopologyViewManager.java
index 04fd25a..cc7ac77 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/TopologyViewManager.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/TopologyViewManager.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.impl.discovery;
import java.util.Map;
import java.util.Set;
@@ -33,7 +33,7 @@ public class TopologyViewManager {
*/
private final Map<String, State> states = new ConcurrentHashMap<>();
- private volatile TopologyView currentView = new TopologyView();
+ private volatile TopologyView currentView = new TopologyView(); //NOSONAR
private final long refreshTtl;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMX.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMX.java
index 9440ffe..48e69a5 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMX.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMX.java
@@ -33,6 +33,8 @@ import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
+import org.apache.sling.distribution.journal.impl.discovery.DiscoveryService;
+import org.apache.sling.distribution.journal.impl.discovery.State;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index 6b9e77a..d14ad66 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -19,7 +19,6 @@
package org.apache.sling.distribution.journal.impl.publisher;
-import static java.util.stream.StreamSupport.stream;
import static java.util.Objects.requireNonNull;
import static org.apache.sling.distribution.DistributionRequestState.ACCEPTED;
import static org.apache.sling.distribution.DistributionRequestType.ADD;
@@ -27,34 +26,33 @@ import static org.apache.sling.distribution.DistributionRequestType.DELETE;
import static org.apache.sling.distribution.DistributionRequestType.TEST;
import static org.apache.sling.distribution.journal.shared.DistributionMetricsService.timed;
+import java.io.Closeable;
import java.util.Arrays;
import java.util.Collections;
import java.util.Dictionary;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Hashtable;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
-import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import javax.management.NotCompliantMBeanException;
import org.apache.commons.io.IOUtils;
+import org.apache.sling.distribution.journal.impl.discovery.DiscoveryService;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
-import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.queue.PubQueueProvider;
import org.apache.sling.distribution.journal.shared.AgentState;
import org.apache.sling.distribution.journal.shared.DefaultDistributionLog;
import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.shared.JMXRegistration;
import org.apache.sling.distribution.journal.shared.SimpleDistributionResponse;
import org.apache.sling.distribution.journal.shared.Topics;
-import org.apache.commons.lang3.StringUtils;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.distribution.DistributionRequest;
import org.apache.sling.distribution.DistributionRequestState;
@@ -75,7 +73,10 @@ import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.metatype.annotations.Designate;
+
import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.JournalAvailable;
/**
@@ -106,9 +107,6 @@ public class DistributionPublisher implements DistributionAgent {
private PackageQueuedNotifier queuedNotifier;
@Reference
- private PubQueueProvider pubQueueProvider;
-
- @Reference
private DiscoveryService discoveryService;
@Reference
@@ -126,6 +124,9 @@ public class DistributionPublisher implements DistributionAgent {
@Reference
private DistributionMetricsService distributionMetricsService;
+ @Reference
+ private PubQueueProvider pubQueueProvider;
+
private String pubAgentName;
private String pkgType;
@@ -140,6 +141,8 @@ public class DistributionPublisher implements DistributionAgent {
private DistributionMetricsService.GaugeService<Integer> subscriberCountGauge;
+ private Closeable statusPoller;
+
public DistributionPublisher() {
log = new DefaultDistributionLog(pubAgentName, this.getClass(), DefaultDistributionLog.LogLevel.INFO);
REQ_TYPES.put(ADD, this::sendAndWait);
@@ -177,11 +180,19 @@ public class DistributionPublisher implements DistributionAgent {
"Current number of publish subscribers",
() -> discoveryService.getTopologyView().getSubscribedAgentIds().size()
);
+
+ statusPoller = messagingProvider.createPoller(
+ topics.getStatusTopic(),
+ Reset.earliest,
+ HandlerAdapter.create(PackageStatusMessage.class, pubQueueProvider::handleStatus)
+ );
+
log.info(msg);
}
@Deactivate
public void deactivate() {
+ IOUtils.closeQuietly(statusPoller, pubQueueProvider);
reg.close();
componentReg.unregister();
String msg = String.format("Stopped Publisher agent %s with packageBuilder %s, queuedTimeout %s",
@@ -206,41 +217,17 @@ public class DistributionPublisher implements DistributionAgent {
@Nonnull
@Override
public Iterable<String> getQueueNames() {
-
- // Queues names are generated only for the subscriber agents which are
- // alive and are subscribed to the publisher agent name (pubAgentName).
- // The queue names match the subscriber agent identifier (subAgentId).
- //
- // If errors queues are enabled, an error queue name is generated which
- // follows the pattern "%s-error". The pattern is deliberately different
- // from the SCD on Jobs one ("error-%s") as we don't want to support
- // the UI ability to retry items from the error queue.
- Set<String> queueNames = new HashSet<>();
- TopologyView view = discoveryService.getTopologyView();
- for (String subAgentId : view.getSubscribedAgentIds(pubAgentName)) {
- queueNames.add(subAgentId);
- State subState = view.getState(subAgentId, pubAgentName);
- if (subState != null) {
- boolean errorQueueEnabled = (subState.getMaxRetries() >= 0);
- if (errorQueueEnabled) {
- queueNames.add(String.format("%s-error", subAgentId));
- }
- }
- }
- return Collections.unmodifiableCollection(queueNames);
+ return Collections.unmodifiableCollection(pubQueueProvider.getQueueNames(pubAgentName));
}
@Override
public DistributionQueue getQueue(String queueName) {
-
- // validate that queueName is a valid name returned by #getQueueNames
- if (stream(getQueueNames().spliterator(), true).noneMatch(queueName::equals)) {
- distributionMetricsService.getQueueAccessErrorCount().increment();
- return null;
- }
-
try {
- return queueName.endsWith("-error") ? getErrorQueue(queueName) : getPubQueue(queueName);
+ DistributionQueue queue = pubQueueProvider.getQueue(pubAgentName, queueName);
+ if (queue == null) {
+ distributionMetricsService.getQueueAccessErrorCount().increment();
+ }
+ return queue;
} catch (Exception e) {
distributionMetricsService.getQueueAccessErrorCount().increment();
throw e;
@@ -248,25 +235,6 @@ public class DistributionPublisher implements DistributionAgent {
}
@Nonnull
- private DistributionQueue getErrorQueue(String queueName) {
- AgentId subAgentId = new AgentId(StringUtils.substringBeforeLast(queueName, "-error"));
- return pubQueueProvider.getErrorQueue(pubAgentName, subAgentId.getSlingId(), subAgentId.getAgentName(), queueName);
- }
-
- @CheckForNull
- private DistributionQueue getPubQueue(String queueName) {
- TopologyView view = discoveryService.getTopologyView();
- AgentId subAgentId = new AgentId(queueName);
- State state = view.getState(subAgentId.getAgentId(), pubAgentName);
- if (state != null) {
- return pubQueueProvider.getQueue(pubAgentName, subAgentId.getSlingId(), subAgentId.getAgentName(), queueName, state.getOffset() + 1, state.getRetries(), state.isEditable());
- }
- return null;
- }
-
-
-
- @Nonnull
@Override
public DistributionLog getLog() {
return log;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java
new file mode 100644
index 0000000..5bd9e87
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.publisher;
+
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
+
+import java.io.Closeable;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.distribution.journal.FullMessage;
+import org.apache.sling.distribution.journal.MessageHandler;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.discovery.DiscoveryService;
+import org.apache.sling.distribution.journal.impl.discovery.State;
+import org.apache.sling.distribution.journal.impl.discovery.TopologyView;
+import org.apache.sling.distribution.journal.messages.ClearCommand;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.queue.CacheCallback;
+import org.apache.sling.distribution.journal.queue.ClearCallback;
+import org.apache.sling.distribution.journal.queue.QueueState;
+import org.apache.sling.distribution.journal.shared.AgentId;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessagingCacheCallback implements CacheCallback {
+ private Logger log = LoggerFactory.getLogger(this.getClass());
+
+ private final MessagingProvider messagingProvider;
+
+ private final String packageTopic;
+
+ private final DistributionMetricsService distributionMetricsService;
+
+ private final DiscoveryService discoveryService;
+
+ private final Consumer<ClearCommand> commandSender;
+
+ public MessagingCacheCallback(
+ MessagingProvider messagingProvider,
+ String packageTopic,
+ DistributionMetricsService distributionMetricsService,
+ DiscoveryService discoveryService,
+ Consumer<ClearCommand> commandSender) {
+ this.messagingProvider = messagingProvider;
+ this.packageTopic = packageTopic;
+ this.distributionMetricsService = distributionMetricsService;
+ this.discoveryService = discoveryService;
+ this.commandSender = commandSender;
+ }
+
+ @Override
+ public Closeable createConsumer(MessageHandler<PackageMessage> handler) {
+ log.info("Starting consumer");
+ QueueCacheSeeder seeder = new QueueCacheSeeder(messagingProvider.createSender(packageTopic)); //NOSONAR
+ Closeable poller = messagingProvider.createPoller( //NOSONAR
+ packageTopic,
+ Reset.latest,
+ create(PackageMessage.class, (info, message) -> { seeder.close(); handler.handle(info, message); })
+ );
+ seeder.startSeeding();
+ return () -> IOUtils.closeQuietly(seeder, poller);
+ }
+
+ @Override
+ public List<FullMessage<PackageMessage>> fetchRange(long minOffset, long maxOffset) throws InterruptedException {
+ distributionMetricsService.getQueueCacheFetchCount().increment();
+ return new RangePoller(messagingProvider, packageTopic, minOffset, maxOffset)
+ .fetchRange();
+ }
+
+ @Override
+ public QueueState getQueueState(String pubAgentName, String subAgentId) {
+ TopologyView view = discoveryService.getTopologyView();
+ State state = view.getState(subAgentId, pubAgentName);
+ if (state == null) {
+ return null;
+ }
+ ClearCallback editableCallback = offset -> sendClearCommand(pubAgentName, new AgentId(subAgentId), offset);
+ ClearCallback clearCallback = state.isEditable() ? editableCallback : null;
+ long curOffset = state.getOffset();
+ int headRetries = state.getRetries();
+ int maxRetries = state.getMaxRetries();
+ return new QueueState(curOffset, headRetries, maxRetries, clearCallback);
+ }
+
+ private void sendClearCommand(String pubAgentName, AgentId subAgentId, long offset) {
+ ClearCommand commandMessage = ClearCommand.builder()
+ .pubAgentName(pubAgentName)
+ .subSlingId(subAgentId.getSlingId())
+ .subAgentName(subAgentId.getAgentName())
+ .offset(offset)
+ .build();
+ log.info("Sending clear command to subSlingId: {}, subAgentName: {} with offset {}.", subAgentId.getSlingId(), subAgentId.getAgentName(), offset);
+ commandSender.accept(commandMessage);
+ }
+
+ @Override
+ public Set<String> getSubscribedAgentIds(String pubAgentName) {
+ return discoveryService.getTopologyView().getSubscribedAgentIds(pubAgentName);
+ }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
index ea3adc5..d5dce4e 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
@@ -26,11 +26,13 @@ import java.util.stream.LongStream;
import javax.annotation.ParametersAreNonnullByDefault;
+import org.apache.sling.distribution.journal.impl.discovery.TopologyChangeHandler;
+import org.apache.sling.distribution.journal.impl.discovery.TopologyViewDiff;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
-import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
-import org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
import org.apache.sling.distribution.journal.messages.PackageDistributedMessage;
+import org.apache.sling.distribution.journal.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.queue.QueueItemFactory;
import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.MessagingProvider;
@@ -57,7 +59,7 @@ public class PackageDistributedNotifier implements TopologyChangeHandler {
private EventAdmin eventAdmin;
@Reference
- private PubQueueCacheService pubQueueCacheService;
+ private PubQueueProvider pubQueueCacheService;
@Reference
private MessagingProvider messagingProvider;
@@ -91,28 +93,32 @@ public class PackageDistributedNotifier implements TopologyChangeHandler {
long minOffset = offsets.get().findFirst().getAsLong();
OffsetQueue<DistributionQueueItem> offsetQueue = pubQueueCacheService.getOffsetQueue(pubAgentName, minOffset);
offsets
- .get()
- .mapToObj(offsetQueue::getItem)
- .filter(Objects::nonNull)
- .forEach(msg -> processOffset(pubAgentName, msg));
+ .get()
+ .mapToObj(offsetQueue::getItem)
+ .filter(Objects::nonNull)
+ .forEach(msg -> processOffset(pubAgentName, msg));
}
protected void processOffset(String pubAgentName, DistributionQueueItem queueItem) {
sendEvt(pubAgentName, queueItem);
- sendMsg(pubAgentName, queueItem);
+ if (sendMsg) {
+ sendMsg(pubAgentName, queueItem);
+ }
}
private void sendMsg(String pubAgentName, DistributionQueueItem queueItem) {
- if (sendMsg) {
- PackageDistributedMessage msg = new PackageDistributedMessage();
- msg.pubAgentName = pubAgentName;
- msg.packageId = queueItem.getPackageId();
- msg.offset = (Long) queueItem.get(QueueItemFactory.RECORD_OFFSET);
- msg.paths = (String[]) queueItem.get(PROPERTY_REQUEST_PATHS);
- msg.deepPaths = (String[]) queueItem.get(PROPERTY_REQUEST_DEEP_PATHS);
-
- sender.accept(msg);
- }
+ PackageDistributedMessage msg = createDistributedMessage(pubAgentName, queueItem);
+ sender.accept(msg);
+ }
+
+ private PackageDistributedMessage createDistributedMessage(String pubAgentName, DistributionQueueItem queueItem) {
+ return PackageDistributedMessage.builder()
+ .pubAgentName(pubAgentName)
+ .packageId(queueItem.getPackageId())
+ .offset((Long) queueItem.get(QueueItemFactory.RECORD_OFFSET))
+ .paths((String[]) queueItem.get(PROPERTY_REQUEST_PATHS))
+ .deepPaths((String[]) queueItem.get(PROPERTY_REQUEST_DEEP_PATHS))
+ .build();
}
private void sendEvt(String pubAgentName, DistributionQueueItem queueItem) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisher.java
new file mode 100644
index 0000000..dd390d2
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisher.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.publisher;
+
+import java.util.Hashtable;
+import java.util.function.Consumer;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.distribution.journal.JournalAvailable;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.impl.discovery.DiscoveryService;
+import org.apache.sling.distribution.journal.messages.ClearCommand;
+import org.apache.sling.distribution.journal.queue.CacheCallback;
+import org.apache.sling.distribution.journal.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.queue.PubQueueProviderFactory;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.Topics;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+
+/**
+ * Normally PubQueueProvider should be created per publish agent.
+ * For compatibility with current code and to save on number of consumers
+ * we must make sure to publish only one for the messaging based impl.
+ */
+@Component
+public class PubQueueProviderPublisher {
+ @Reference
+ private MessagingProvider messagingProvider;
+
+ @Reference
+ private DiscoveryService discoveryService;
+
+ @Reference
+ private Topics topics;
+
+ @Reference
+ JournalAvailable journalAvailable;
+
+ @Reference
+ private DistributionMetricsService distributionMetricsService;
+
+ @Reference
+ private PubQueueProviderFactory pubQueueProviderFactory;
+
+ private PubQueueProvider pubQueueProvider;
+
+ private ServiceRegistration<PubQueueProvider> reg;
+
+ @Activate
+ public void activate(BundleContext context) {
+ Consumer<ClearCommand> commandSender = messagingProvider.createSender(topics.getCommandTopic());
+ CacheCallback callback = new MessagingCacheCallback(
+ messagingProvider,
+ topics.getPackageTopic(),
+ distributionMetricsService,
+ discoveryService,
+ commandSender);
+ this.pubQueueProvider = pubQueueProviderFactory.create(callback);
+ reg = context.registerService(PubQueueProvider.class, this.pubQueueProvider, new Hashtable<>());
+ }
+
+ @Deactivate
+ public void deactivate() {
+ IOUtils.closeQuietly(this.pubQueueProvider);
+ reg.unregister();
+ }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/QueueCacheSeeder.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/publisher/QueueCacheSeeder.java
index 26a6eaa..bbf27b6 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/QueueCacheSeeder.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.impl.publisher;
import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/RangePoller.java
similarity index 94%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/publisher/RangePoller.java
index 72e7c4a..040db85 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/RangePoller.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.impl.publisher;
import static org.apache.sling.distribution.journal.HandlerAdapter.create;
@@ -55,12 +55,12 @@ public class RangePoller {
public RangePoller(MessagingProvider messagingProvider,
String packageTopic,
long minOffset,
- long maxOffset) {
- this.maxOffset = maxOffset;
+ long maxOffsetExclusive) {
+ this.maxOffset = maxOffsetExclusive;
this.minOffset = minOffset;
this.messages = new ArrayList<>();
String assign = messagingProvider.assignTo(minOffset);
- LOG.info("Fetching offsets [{},{}[", minOffset, maxOffset);
+ LOG.info("Fetching offsets [{},{}[", minOffset, maxOffsetExclusive);
headPoller = messagingProvider.createPoller(
packageTopic, Reset.earliest, assign,
create(PackageMessage.class, this::handlePackage)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
deleted file mode 100644
index b8d6692..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.impl.queue.impl;
-
-import javax.annotation.Nonnull;
-import javax.annotation.ParametersAreNonnullByDefault;
-
-import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.messages.PackageMessage;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.PublisherConfigurationAvailable;
-import org.apache.sling.distribution.journal.shared.Topics;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.JournalAvailable;
-import org.apache.sling.distribution.journal.MessageSender;
-import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.event.EventAdmin;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Component(immediate = true, service = PubQueueCacheService.class)
-@ParametersAreNonnullByDefault
-public class PubQueueCacheService {
-
- private static final Logger LOG = LoggerFactory.getLogger(PubQueueCacheService.class);
-
- /**
- * The minimum size to collect the cache. Each cache entry requires
- * around 500B of heap space. 10'000 entries ~= 5MB on heap.
- */
- private static final int CLEANUP_THRESHOLD = 10_000;
-
- /**
- * Will cause the cache to be cleared when we loose the journal
- */
- @Reference
- private JournalAvailable journalAvailable;
-
- /**
- * The cache is active only when at least one DistributionSubscriber agent is configured.
- */
- @Reference
- private PublisherConfigurationAvailable publisherConfigurationAvailable;
-
- @Reference
- private MessagingProvider messagingProvider;
-
- @Reference
- private Topics topics;
-
- @Reference
- private EventAdmin eventAdmin;
-
- @Reference
- private DistributionMetricsService distributionMetricsService;
-
- private volatile PubQueueCache cache;
-
- public PubQueueCacheService() {}
-
- public PubQueueCacheService(MessagingProvider messagingProvider,
- Topics topics,
- EventAdmin eventAdmin) {
- this.messagingProvider = messagingProvider;
- this.topics = topics;
- this.eventAdmin = eventAdmin;
- }
-
- @Activate
- public void activate() {
- cache = newCache();
- LOG.info("Started Publisher queue cache service");
- }
-
- @Deactivate
- public void deactivate() {
- PubQueueCache queueCache = this.cache;
- if (queueCache != null) {
- queueCache.close();
- }
- LOG.info("Stopped Publisher queue cache service");
- }
-
- @Nonnull
- public OffsetQueue<DistributionQueueItem> getOffsetQueue(String pubAgentName, long minOffset) {
- try {
- return cache.getOffsetQueue(pubAgentName, minOffset);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
- }
-
- /**
- * The cleanup renew the cache when
- * a capacity threshold has been reached.
- */
- public void cleanup() {
- PubQueueCache queueCache = this.cache;
- if (queueCache != null) {
- int size = queueCache.size();
- if (size > CLEANUP_THRESHOLD) {
- LOG.info("Cleanup package cache (size={}/{})", size, CLEANUP_THRESHOLD);
- queueCache.close();
- cache = newCache();
- } else {
- LOG.info("No cleanup required for package cache (size={}/{})", size, CLEANUP_THRESHOLD);
- }
- }
- }
-
- private PubQueueCache newCache() {
- String topic = topics.getPackageTopic();
- MessageSender<PackageMessage> sender = messagingProvider.createSender(topic);
- QueueCacheSeeder seeder = new QueueCacheSeeder(sender);
- return new PubQueueCache(messagingProvider, eventAdmin, distributionMetricsService, topic, seeder);
- }
-}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
deleted file mode 100644
index c019c09..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.impl.queue.impl;
-
-import static org.apache.sling.distribution.journal.HandlerAdapter.create;
-
-import java.io.Closeable;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Consumer;
-
-import javax.annotation.Nonnull;
-import javax.annotation.ParametersAreNonnullByDefault;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
-import org.apache.sling.distribution.journal.messages.ClearCommand;
-import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
-import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
-import org.apache.sling.distribution.journal.shared.Topics;
-import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.apache.sling.distribution.queue.spi.DistributionQueue;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Reference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * As reading all messages is an expensive operation this component is activated lazily only when requested by the Publisher.
- */
-@Component
-@ParametersAreNonnullByDefault
-public class PubQueueProviderImpl implements PubQueueProvider {
-
- private static final Logger LOG = LoggerFactory.getLogger(PubQueueProviderImpl.class);
-
- /*
- * (pubAgentName#subAgentId x OffsetQueue)
- */
- private final Map<String, OffsetQueue<Long>> errorQueues = new ConcurrentHashMap<>();
-
- @Reference
- private MessagingProvider messagingProvider;
-
- @Reference
- private Topics topics;
-
- @Reference
- private PubQueueCacheService pubQueueCacheService;
-
- private Closeable statusPoller;
-
- private Consumer<ClearCommand> sender;
-
- public PubQueueProviderImpl() {
- }
-
- public PubQueueProviderImpl(
- PubQueueCacheService pubQueueCacheService,
- MessagingProvider messagingProvider,
- Topics topics) {
- this.pubQueueCacheService = pubQueueCacheService;
- this.messagingProvider = messagingProvider;
- this.topics = topics;
- }
-
- @Activate
- public void activate() {
- statusPoller = messagingProvider.createPoller(
- topics.getStatusTopic(),
- Reset.earliest,
- create(PackageStatusMessage.class, this::handleStatus)
- );
- sender = messagingProvider.createSender(topics.getCommandTopic());
- LOG.info("Started Publisher queue provider service");
- }
-
- @Deactivate
- public void deactivate() {
- IOUtils.closeQuietly(statusPoller);
- LOG.info("Stopped Publisher queue provider service");
- }
-
- @Nonnull
- @Override
- public DistributionQueue getQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName, long minOffset, int headRetries, boolean editable) {
- OffsetQueue<DistributionQueueItem> agentQueue = pubQueueCacheService.getOffsetQueue(pubAgentName, minOffset);
- ClearCallback editableCallback = offset -> sendClearCommand(subSlingId, subAgentName, offset);
- ClearCallback callback = editable ? editableCallback : null;
- return new PubQueue(queueName, agentQueue.getMinOffsetQueue(minOffset), headRetries, callback);
- }
-
- @Nonnull
- @Override
- public DistributionQueue getErrorQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName) {
- String errorQueueKey = errorQueueKey(pubAgentName, subSlingId, subAgentName);
- OffsetQueue<Long> errorQueue = errorQueues.getOrDefault(errorQueueKey, new OffsetQueueImpl<>());
- long headOffset = errorQueue.getHeadOffset();
- final OffsetQueue<DistributionQueueItem> agentQueue;
- if (headOffset < 0) {
- agentQueue = new OffsetQueueImpl<>();
- } else {
- long minReferencedOffset = errorQueue.getItem(headOffset);
- agentQueue = pubQueueCacheService.getOffsetQueue(pubAgentName, minReferencedOffset);
- }
-
- return new PubErrQueue(queueName, agentQueue, errorQueue);
- }
-
- public void handleStatus(MessageInfo info, PackageStatusMessage message) {
- if (message.getStatus() == Status.REMOVED_FAILED) {
- String errorQueueKey = errorQueueKey(message.getPubAgentName(), message.getSubSlingId(), message.getSubAgentName());
- OffsetQueue<Long> errorQueue = errorQueues.computeIfAbsent(errorQueueKey, key -> new OffsetQueueImpl<>());
- errorQueue.putItem(info.getOffset(), message.getOffset());
- }
- }
-
- @Nonnull
- private String errorQueueKey(String pubAgentName, String subSlingId, String subAgentName) {
- return String.format("%s#%s#%s", pubAgentName, subSlingId, subAgentName);
- }
-
- private void sendClearCommand(String subSlingId, String subAgentName, long offset) {
- ClearCommand commandMessage = ClearCommand.builder()
- .subSlingId(subSlingId)
- .subAgentName(subAgentName)
- .offset(offset)
- .build();
- LOG.info("Sending clear command to subSlingId: {}, subAgentName: {} with offset {}.", subSlingId, subAgentName, offset);
- sender.accept(commandMessage);
- }
-
-}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheCleanupTask.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheCleanupTask.java
deleted file mode 100644
index b2bd673..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheCleanupTask.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.impl.queue.impl;
-
-import javax.annotation.ParametersAreNonnullByDefault;
-
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
-import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
-
-/**
- * Periodical task to cleanup the resources
- * used by the cache.
- */
-@Component(
- service = Runnable.class,
- property = {
- PROPERTY_SCHEDULER_CONCURRENT + ":Boolean=false",
- PROPERTY_SCHEDULER_PERIOD + ":Long=" + 12 * 60 * 60 // 12 hours
- })
-@ParametersAreNonnullByDefault
-public class QueueCacheCleanupTask implements Runnable {
-
- private static final Logger LOG = LoggerFactory.getLogger(QueueCacheCleanupTask.class);
-
- @Reference
- private PubQueueCacheService queueCacheService;
-
- @Override
- public void run() {
- LOG.info("Starting package cache cleanup task");
- queueCacheService.cleanup();
- LOG.info("Stopping package cache cleanup task");
- }
-}
-
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java
deleted file mode 100644
index 44dd6f8..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.impl.queue.impl;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.Set;
-
-import javax.annotation.CheckForNull;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import javax.annotation.ParametersAreNonnullByDefault;
-
-import org.apache.sling.distribution.queue.spi.DistributionQueue;
-import org.apache.sling.distribution.journal.shared.PackageRetries;
-import org.apache.sling.distribution.queue.DistributionQueueEntry;
-import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.apache.sling.distribution.queue.DistributionQueueItemState;
-import org.apache.sling.distribution.queue.DistributionQueueState;
-import org.apache.sling.distribution.queue.DistributionQueueStatus;
-import org.apache.sling.distribution.queue.DistributionQueueType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.sling.distribution.queue.DistributionQueueItemState.QUEUED;
-import static org.apache.sling.distribution.queue.DistributionQueueState.BLOCKED;
-import static org.apache.sling.distribution.queue.DistributionQueueState.IDLE;
-import static org.apache.sling.distribution.queue.DistributionQueueState.RUNNING;
-import static org.apache.sling.distribution.queue.DistributionQueueType.ORDERED;
-
-@ParametersAreNonnullByDefault
-public class SubQueue implements DistributionQueue {
-
- private static final String UNSUPPORTED_CLEAR_OPERATION = "Unsupported clear operation";
-
- @SuppressWarnings("unused")
- private static final Logger LOG = LoggerFactory.getLogger(SubQueue.class);
-
- private final DistributionQueueItem headItem;
-
- private final PackageRetries packageRetries;
-
- private final String queueName;
-
- private final QueueEntryFactory entryFactory;
-
- public SubQueue(String queueName,
- @Nullable
- DistributionQueueItem headItem,
- PackageRetries packageRetries) {
- this.headItem = headItem;
- this.queueName = Objects.requireNonNull(queueName);
- this.packageRetries = Objects.requireNonNull(packageRetries);
- this.entryFactory = new QueueEntryFactory(queueName, this::attempts);
- }
-
- @Nonnull
- @Override
- public String getName() {
- return queueName;
- }
-
- @Override
- public DistributionQueueEntry add(DistributionQueueItem queueItem) {
- throw new UnsupportedOperationException("Unsupported add operation");
- }
-
- @Override
- @CheckForNull
- public DistributionQueueEntry getHead() {
- return entryFactory.create(headItem);
- }
-
- @Nonnull
- @Override
- public Iterable<DistributionQueueEntry> getEntries(int skip, int limit) {
- final List<DistributionQueueEntry> entries;
- if (skip == 0 && (limit == -1 || limit > 0) && headItem != null) {
- entries = Collections.singletonList(entryFactory.create(headItem));
- } else {
- entries = Collections.emptyList();
- }
- return Collections.unmodifiableList(entries);
- }
-
- @Override
- public DistributionQueueEntry getEntry(String entryId) {
- return (entryId.equals(EntryUtil.entryId(headItem)))
- ? entryFactory.create(headItem)
- : null;
- }
-
- @Override
- public DistributionQueueEntry remove(String entryId) {
- throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
- }
-
- @Nonnull
- @Override
- public Iterable<DistributionQueueEntry> remove(Set<String> entryIds) {
- throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
- }
-
- @Nonnull
- @Override
- public Iterable<DistributionQueueEntry> clear(int limit) {
- throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
- }
-
- @Nonnull
- @Override
- public DistributionQueueStatus getStatus() {
- final DistributionQueueState queueState;
- final int itemsCount;
- DistributionQueueEntry headEntry = getHead();
- if (headEntry != null) {
- itemsCount = 1;
- DistributionQueueItemState itemState = headEntry.getStatus().getItemState();
- if (itemState == QUEUED) {
- queueState = RUNNING;
- } else {
- queueState = BLOCKED;
- }
- } else {
- itemsCount = 0;
- queueState = IDLE;
- }
-
- return new DistributionQueueStatus(itemsCount, queueState);
- }
-
- @Override
- @Nonnull
- public DistributionQueueType getType() {
- return ORDERED;
- }
-
- @Override
- public boolean hasCapability(String capability) {
- return false;
- }
-
- private int attempts(DistributionQueueItem queueItem) {
- String entryId = EntryUtil.entryId(queueItem);
- return packageRetries.get(entryId);
- }
-
-}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/ClearCallback.java b/src/main/java/org/apache/sling/distribution/journal/queue/CacheCallback.java
similarity index 55%
copy from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/ClearCallback.java
copy to src/main/java/org/apache/sling/distribution/journal/queue/CacheCallback.java
index 0396b6e..c42306f 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/ClearCallback.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/CacheCallback.java
@@ -16,13 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.queue;
-import javax.annotation.ParametersAreNonnullByDefault;
+import java.io.Closeable;
+import java.util.List;
+import java.util.Set;
-@ParametersAreNonnullByDefault
-public interface ClearCallback {
-
- void clear(long offset);
+import org.apache.sling.distribution.journal.FullMessage;
+import org.apache.sling.distribution.journal.MessageHandler;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+public interface CacheCallback {
+ Closeable createConsumer(MessageHandler<PackageMessage> handler);
+ List<FullMessage<PackageMessage>> fetchRange(long minOffset, long maxOffset) throws InterruptedException;
+ QueueState getQueueState(String pubAgentName, String subAgentId);
+ Set<String> getSubscribedAgentIds(String pubAgentName);
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/ClearCallback.java b/src/main/java/org/apache/sling/distribution/journal/queue/ClearCallback.java
similarity index 93%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/ClearCallback.java
rename to src/main/java/org/apache/sling/distribution/journal/queue/ClearCallback.java
index 0396b6e..ccdf76a 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/ClearCallback.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/ClearCallback.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.queue;
import javax.annotation.ParametersAreNonnullByDefault;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/OffsetQueue.java b/src/main/java/org/apache/sling/distribution/journal/queue/OffsetQueue.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/OffsetQueue.java
rename to src/main/java/org/apache/sling/distribution/journal/queue/OffsetQueue.java
index 7ed776f..c7ecdae 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/OffsetQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/OffsetQueue.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue;
+package org.apache.sling.distribution.journal.queue;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java b/src/main/java/org/apache/sling/distribution/journal/queue/PubQueueProvider.java
similarity index 56%
copy from src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java
copy to src/main/java/org/apache/sling/distribution/journal/queue/PubQueueProvider.java
index 178f6f2..6038afb 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/PubQueueProvider.java
@@ -16,20 +16,34 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue;
+package org.apache.sling.distribution.journal.queue;
+
+import java.io.Closeable;
+import java.util.Set;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
+import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
@ParametersAreNonnullByDefault
-public interface PubQueueProvider {
+public interface PubQueueProvider extends Closeable {
- @Nonnull
- DistributionQueue getQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName, long minOffset, int headRetries, boolean editable);
+ @Nullable
+ DistributionQueue getQueue(String pubAgentName, String queueName);
@Nonnull
- DistributionQueue getErrorQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName);
+ OffsetQueue<DistributionQueueItem> getOffsetQueue(String pubAgentName, long minOffset);
+
+ void handleStatus(MessageInfo info, PackageStatusMessage message);
+
+ /**
+ * Get queue names for alive subscribed subscriber agents.
+ */
+ Set<String> getQueueNames(String pubAgentName);
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java b/src/main/java/org/apache/sling/distribution/journal/queue/PubQueueProviderFactory.java
similarity index 57%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java
rename to src/main/java/org/apache/sling/distribution/journal/queue/PubQueueProviderFactory.java
index 178f6f2..7f8e913 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/PubQueueProviderFactory.java
@@ -16,20 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue;
+package org.apache.sling.distribution.journal.queue;
-import javax.annotation.Nonnull;
-import javax.annotation.ParametersAreNonnullByDefault;
+public interface PubQueueProviderFactory {
-import org.apache.sling.distribution.queue.spi.DistributionQueue;
+ PubQueueProvider create(CacheCallback callback);
-@ParametersAreNonnullByDefault
-public interface PubQueueProvider {
-
- @Nonnull
- DistributionQueue getQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName, long minOffset, int headRetries, boolean editable);
-
- @Nonnull
- DistributionQueue getErrorQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName);
-
-}
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactory.java b/src/main/java/org/apache/sling/distribution/journal/queue/QueueItemFactory.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactory.java
rename to src/main/java/org/apache/sling/distribution/journal/queue/QueueItemFactory.java
index 855a757..a026efb 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactory.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/QueueItemFactory.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue;
+package org.apache.sling.distribution.journal.queue;
import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_PACKAGE_TYPE;
import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS;
diff --git a/src/main/java/org/apache/sling/distribution/journal/queue/QueueState.java b/src/main/java/org/apache/sling/distribution/journal/queue/QueueState.java
new file mode 100644
index 0000000..eae9d0d
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/QueueState.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.queue;
+
+public class QueueState {
+
+ private final long lastProcessedOffset;
+ private final int headRetries;
+ private final int maxRetries;
+ private final ClearCallback clearCallback;
+
+ public QueueState(long lastProcessedOffset, int headRetries, int maxRetries, ClearCallback clearCallback) {
+ this.lastProcessedOffset = lastProcessedOffset;
+ this.headRetries = headRetries;
+ this.maxRetries = maxRetries;
+ this.clearCallback = clearCallback;
+ }
+
+ public long getLastProcessedOffset() {
+ return lastProcessedOffset;
+ }
+
+ public int getHeadRetries() {
+ return headRetries;
+ }
+
+ public int getMaxRetries() {
+ return maxRetries;
+ }
+
+ public ClearCallback getClearCallback() {
+ return clearCallback;
+ }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/EntryUtil.java
similarity index 91%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java
rename to src/main/java/org/apache/sling/distribution/journal/queue/impl/EntryUtil.java
index 529fc52..9b9a84b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/EntryUtil.java
@@ -16,9 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.queue.impl;
-import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
+import org.apache.sling.distribution.journal.queue.QueueItemFactory;
import org.apache.sling.distribution.queue.DistributionQueueItem;
public final class EntryUtil {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImpl.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/OffsetQueueImpl.java
similarity index 96%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImpl.java
rename to src/main/java/org/apache/sling/distribution/journal/queue/impl/OffsetQueueImpl.java
index 941cf66..babdfe0 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImpl.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/OffsetQueueImpl.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.queue.impl;
import java.util.NoSuchElementException;
import java.util.Objects;
@@ -27,7 +27,7 @@ import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
-import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.queue.OffsetQueue;
@ParametersAreNonnullByDefault
public class OffsetQueueImpl<T> implements OffsetQueue<T>, OffsetQueueImplMBean {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplMBean.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/OffsetQueueImplMBean.java
similarity index 93%
copy from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplMBean.java
copy to src/main/java/org/apache/sling/distribution/journal/queue/impl/OffsetQueueImplMBean.java
index e4ef634..9c320a7 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplMBean.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/OffsetQueueImplMBean.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.queue.impl;
public interface OffsetQueueImplMBean {
long getHeadOffset();
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubErrQueue.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
rename to src/main/java/org/apache/sling/distribution/journal/queue/impl/PubErrQueue.java
index aaa7b1c..69f1f59 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubErrQueue.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.queue.impl;
import java.util.ArrayList;
import java.util.List;
@@ -25,7 +25,7 @@ import java.util.Set;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
-import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.queue.OffsetQueue;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.DistributionQueueState;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueue.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
rename to src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueue.java
index 23aaf15..3ca97fc 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueue.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.queue.impl;
import static java.util.Collections.emptyList;
import static java.util.stream.StreamSupport.stream;
@@ -39,7 +39,8 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
-import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.queue.ClearCallback;
+import org.apache.sling.distribution.journal.queue.OffsetQueue;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.DistributionQueueItemState;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCache.java
similarity index 82%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
rename to src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCache.java
index fdd6ff7..6022f43 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCache.java
@@ -16,13 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.queue.impl;
import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.groupingBy;
-import static org.apache.sling.distribution.journal.HandlerAdapter.create;
import java.io.Closeable;
import java.util.HashSet;
@@ -40,20 +39,17 @@ import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.messages.PackageMessage;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.queue.CacheCallback;
+import org.apache.sling.distribution.journal.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.queue.QueueItemFactory;
import org.apache.sling.distribution.journal.shared.JMXRegistration;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
/**
* Cache the distribution packages fetched from the package topic.
@@ -71,7 +67,7 @@ public class PubQueueCache {
private static final Logger LOG = LoggerFactory.getLogger(PubQueueCache.class);
- private static final long MAX_FETCH_WAIT_MS = 60 * 1000; // 1 minute
+ private static final long MAX_FETCH_WAIT_MS = 60 * 1000l; // 1 minute
/**
* (pubAgentName x OffsetQueue)
@@ -102,35 +98,16 @@ public class PubQueueCache {
private final Set<JMXRegistration> jmxRegs = new HashSet<>();
- private final MessagingProvider messagingProvider;
-
private final EventAdmin eventAdmin;
- private volatile Closeable tailPoller;
-
- private final QueueCacheSeeder seeder;
+ private volatile Closeable tailPoller; //NOSONAR
- private final String topic;
-
- private final DistributionMetricsService distributionMetricsService;
+ private final CacheCallback callback;
- public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic, QueueCacheSeeder seeder) {
- this.messagingProvider = messagingProvider;
+ public PubQueueCache(EventAdmin eventAdmin, CacheCallback callback) {
this.eventAdmin = eventAdmin;
- this.distributionMetricsService = distributionMetricsService;
- this.topic = topic;
- this.seeder = seeder;
- startPoller();
- this.seeder.startSeeding();
- }
-
- private void startPoller() {
- LOG.info("Starting consumer");
- tailPoller = messagingProvider.createPoller(
- this.topic,
- Reset.latest,
- create(PackageMessage.class, this::handlePackage)
- );
+ this.callback = callback;
+ tailPoller = callback.createConsumer(this::handlePackage);
}
@Nonnull
@@ -148,7 +125,6 @@ public class PubQueueCache {
public void close() {
IOUtils.closeQuietly(tailPoller);
- IOUtils.closeQuietly(seeder);
jmxRegs.forEach(IOUtils::closeQuietly);
}
@@ -201,12 +177,8 @@ public class PubQueueCache {
* cache.
*/
private void fetch(long requestedMinOffset, long cachedMinOffset) throws InterruptedException {
- distributionMetricsService.getQueueCacheFetchCount().increment();
- RangePoller headPoller = new RangePoller(messagingProvider,
- topic,
- requestedMinOffset,
- cachedMinOffset);
- merge(headPoller.fetchRange());
+ List<FullMessage<PackageMessage>> messages = callback.fetchRange(requestedMinOffset, cachedMinOffset);
+ merge(messages);
updateMinOffset(requestedMinOffset);
}
@@ -229,7 +201,6 @@ public class PubQueueCache {
}
private void merge(List<FullMessage<PackageMessage>> messages) {
- LOG.debug("Merging fetched offsets");
messages.stream()
.filter(this::isNotTestMessage)
.collect(groupingBy(message -> message.getMessage().getPubAgentName()))
@@ -251,6 +222,8 @@ public class PubQueueCache {
}
private void sendQueuedEvent(FullMessage<PackageMessage> fMessage) {
+ long offset = fMessage.getInfo().getOffset();
+ LOG.info("Queueing message package-id={}, offset={}", fMessage.getMessage().getPkgId(), offset);
PackageMessage message = fMessage.getMessage();
final Event queuedEvent = DistributionEvent.eventPackageQueued(message, message.getPubAgentName());
eventAdmin.postEvent(queuedEvent);
@@ -274,7 +247,6 @@ public class PubQueueCache {
}
private void handlePackage(final MessageInfo info, final PackageMessage message) {
- seeder.close();
merge(singletonList(new FullMessage<>(info, message)));
updateMaxOffset(info.getOffset());
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderFactoryImpl.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderFactoryImpl.java
new file mode 100644
index 0000000..2549d65
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderFactoryImpl.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.queue.impl;
+
+import org.apache.sling.distribution.journal.queue.CacheCallback;
+import org.apache.sling.distribution.journal.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.queue.PubQueueProviderFactory;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.event.EventAdmin;
+
+@Component
+public class PubQueueProviderFactoryImpl implements PubQueueProviderFactory {
+
+ @Reference
+ private EventAdmin eventAdmin;
+
+ private BundleContext context;
+
+ public void activate(BundleContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public PubQueueProvider create(CacheCallback callback) {
+ return new PubQueueProviderImpl(eventAdmin, callback, context);
+ }
+
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
new file mode 100644
index 0000000..154ab64
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.queue.impl;
+
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
+
+import java.util.Dictionary;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.journal.queue.CacheCallback;
+import org.apache.sling.distribution.journal.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.queue.QueueState;
+import org.apache.sling.distribution.journal.shared.AgentId;
+import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.queue.spi.DistributionQueue;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.EventAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@ParametersAreNonnullByDefault
+public class PubQueueProviderImpl implements PubQueueProvider, Runnable {
+ /**
+ * The minimum size to collect the cache. Each cache entry requires
+ * around 500B of heap space. 10'000 entries ~= 5MB on heap.
+ */
+ private static final int CLEANUP_THRESHOLD = 10_000;
+
+ private static final Logger LOG = LoggerFactory.getLogger(PubQueueProviderImpl.class);
+
+ private final EventAdmin eventAdmin;
+
+ private final CacheCallback callback;
+
+ private volatile PubQueueCache cache; //NOSONAR
+
+ /*
+ * (pubAgentName#subAgentId x OffsetQueue)
+ */
+ private final Map<String, OffsetQueue<Long>> errorQueues = new ConcurrentHashMap<>();
+
+ private ServiceRegistration<?> reg;
+
+ public PubQueueProviderImpl(EventAdmin eventAdmin, CacheCallback callback, BundleContext context) {
+ this.eventAdmin = eventAdmin;
+ this.callback = callback;
+ cache = newCache();
+ startCleanupTask(context);
+ LOG.info("Started Publisher queue provider service");
+ }
+
+ private void startCleanupTask(BundleContext context) {
+ // Register periodic task to update the topology view
+ Dictionary<String, Object> props = new Hashtable<>();
+ props.put(PROPERTY_SCHEDULER_CONCURRENT, false);
+ props.put(PROPERTY_SCHEDULER_PERIOD, 12*60*60L); // every 12 h
+ reg = context.registerService(Runnable.class.getName(), this, props);
+ }
+
+ @Override
+ public void close() {
+ PubQueueCache queueCache = this.cache;
+ if (queueCache != null) {
+ queueCache.close();
+ }
+ if (reg != null) {
+ reg.unregister();
+ }
+ LOG.info("Stopped Publisher queue provider service");
+ }
+
+ @Override
+ public void run() {
+ LOG.info("Starting package cache cleanup task");
+ PubQueueCache queueCache = this.cache;
+ if (queueCache != null) {
+ int size = queueCache.size();
+ if (size > CLEANUP_THRESHOLD) {
+ LOG.info("Cleanup package cache (size={}/{})", size, CLEANUP_THRESHOLD);
+ queueCache.close();
+ cache = newCache();
+ } else {
+ LOG.info("No cleanup required for package cache (size={}/{})", size, CLEANUP_THRESHOLD);
+ }
+ }
+ LOG.info("Stopping package cache cleanup task");
+ }
+
+ @Nonnull
+ @Override
+ public Set<String> getQueueNames(String pubAgentName) {
+ // Queues names are generated only for the subscriber agents which are
+ // alive and are subscribed to the publisher agent name (pubAgentName).
+ // The queue names match the subscriber agent identifier (subAgentId).
+ //
+ // If errors queues are enabled, an error queue name is generated which
+ // follows the pattern "%s-error". The pattern is deliberately different
+ // from the SCD on Jobs one ("error-%s") as we don't want to support
+ // the UI ability to retry items from the error queue.
+ Set<String> queueNames = new HashSet<>();
+ for (String subAgentId : callback.getSubscribedAgentIds(pubAgentName)) {
+ queueNames.add(subAgentId);
+ QueueState subState = callback.getQueueState(pubAgentName, subAgentId);
+ if (subState != null) {
+ boolean errorQueueEnabled = (subState.getMaxRetries() >= 0);
+ if (errorQueueEnabled) {
+ queueNames.add(String.format("%s-error", subAgentId));
+ }
+ }
+ }
+ return queueNames;
+ }
+
+ @Nullable
+ @Override
+ public DistributionQueue getQueue(String pubAgentName, String queueName) {
+ if (queueName.endsWith("-error")) {
+ return getErrorQueue(pubAgentName, queueName);
+ } else {
+ QueueState state = callback.getQueueState(pubAgentName, queueName);
+ if (state == null) {
+ return null;
+ }
+ long minOffset = state.getLastProcessedOffset() + 1; // Start from offset after last processed
+ OffsetQueue<DistributionQueueItem> agentQueue = getOffsetQueue(pubAgentName, minOffset);
+ return new PubQueue(queueName, agentQueue.getMinOffsetQueue(minOffset), state.getHeadRetries(), state.getClearCallback());
+ }
+ }
+
+ @Nonnull
+ private DistributionQueue getErrorQueue(String pubAgentName, String queueName) {
+ AgentId subAgentId = new AgentId(StringUtils.substringBeforeLast(queueName, "-error"));
+ String errorQueueKey = getErrorQueueKey(pubAgentName, subAgentId.getSlingId(), subAgentId.getAgentName());
+ OffsetQueue<Long> errorQueue = errorQueues.getOrDefault(errorQueueKey, new OffsetQueueImpl<>());
+ long headOffset = errorQueue.getHeadOffset();
+ final OffsetQueue<DistributionQueueItem> agentQueue;
+ if (headOffset < 0) {
+ agentQueue = new OffsetQueueImpl<>();
+ } else {
+ long minReferencedOffset = errorQueue.getItem(headOffset);
+ agentQueue = getOffsetQueue(pubAgentName, minReferencedOffset);
+ }
+
+ return new PubErrQueue(queueName, agentQueue, errorQueue);
+ }
+
+ @Nonnull
+ public OffsetQueue<DistributionQueueItem> getOffsetQueue(String pubAgentName, long minOffset) {
+ try {
+ return cache.getOffsetQueue(pubAgentName, minOffset);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void handleStatus(MessageInfo info, PackageStatusMessage message) {
+ if (message.getStatus() == Status.REMOVED_FAILED) {
+ String errorQueueKey = getErrorQueueKey(message.getPubAgentName(), message.getSubSlingId(), message.getSubAgentName());
+ OffsetQueue<Long> errorQueue = errorQueues.computeIfAbsent(errorQueueKey, key -> new OffsetQueueImpl<>());
+ errorQueue.putItem(info.getOffset(), message.getOffset());
+ }
+ }
+
+ private String getErrorQueueKey(String pubAgentName, String subSlingId, String subAgentName) {
+ return String.format("%s#%s#%s", pubAgentName, subSlingId, subAgentName);
+ }
+
+ private PubQueueCache newCache() {
+ return new PubQueueCache(eventAdmin, callback);
+ }
+
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/QueueEntryFactory.java
similarity index 95%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java
rename to src/main/java/org/apache/sling/distribution/journal/queue/impl/QueueEntryFactory.java
index 47a9f25..a29ef93 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/QueueEntryFactory.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.queue.impl;
import static org.apache.sling.distribution.queue.DistributionQueueItemState.ERROR;
import static org.apache.sling.distribution.queue.DistributionQueueItemState.QUEUED;
@@ -24,7 +24,7 @@ import static org.apache.sling.distribution.queue.DistributionQueueItemState.QUE
import java.util.Calendar;
import java.util.function.ToIntFunction;
-import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
+import org.apache.sling.distribution.journal.queue.QueueItemFactory;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.DistributionQueueItemState;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplMBean.java b/src/main/java/org/apache/sling/distribution/journal/queue/package-info.java
similarity index 82%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplMBean.java
rename to src/main/java/org/apache/sling/distribution/journal/queue/package-info.java
index e4ef634..8086ed2 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplMBean.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/package-info.java
@@ -16,10 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue.impl;
-public interface OffsetQueueImplMBean {
- long getHeadOffset();
- long getTailOffset();
- int getSize();
-}
+
+@org.osgi.annotation.versioning.Version("1.0.0")
+@org.osgi.annotation.bundle.Export
+package org.apache.sling.distribution.journal.queue;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/AgentId.java b/src/main/java/org/apache/sling/distribution/journal/shared/AgentId.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/impl/publisher/AgentId.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/AgentId.java
index 6887ac8..6f7b45a 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/AgentId.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/AgentId.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.shared;
import java.util.Objects;
import java.util.UUID;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryServiceTest.java
similarity index 98%
rename from src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryServiceTest.java
index 035f8cb..dda463d 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryServiceTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.impl.discovery;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/StateTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/discovery/StateTest.java
similarity index 98%
rename from src/test/java/org/apache/sling/distribution/journal/impl/publisher/StateTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/discovery/StateTest.java
index 67d3f44..0b7df80 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/StateTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/discovery/StateTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.impl.discovery;
import static java.lang.Math.abs;
import static org.junit.Assert.assertEquals;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/TopologyViewDiffTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/discovery/TopologyViewDiffTest.java
similarity index 99%
rename from src/test/java/org/apache/sling/distribution/journal/impl/publisher/TopologyViewDiffTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/discovery/TopologyViewDiffTest.java
index 1882f33..d46d2e5 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/TopologyViewDiffTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/discovery/TopologyViewDiffTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.impl.discovery;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/TopologyViewManagerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/discovery/TopologyViewManagerTest.java
similarity index 98%
rename from src/test/java/org/apache/sling/distribution/journal/impl/publisher/TopologyViewManagerTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/discovery/TopologyViewManagerTest.java
index 2fa730a..2bb3d12 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/TopologyViewManagerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/discovery/TopologyViewManagerTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.impl.discovery;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/TopologyViewTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/discovery/TopologyViewTest.java
similarity index 99%
rename from src/test/java/org/apache/sling/distribution/journal/impl/publisher/TopologyViewTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/discovery/TopologyViewTest.java
index fc1525d..60b4704 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/TopologyViewTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/discovery/TopologyViewTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.impl.discovery;
import static java.lang.Math.abs;
import static java.util.Arrays.asList;
@@ -37,7 +37,6 @@ import java.util.stream.Collectors;
import org.junit.Test;
-
public class TopologyViewTest {
private static final Random RAND = new Random();
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java
index 0878999..abfb083 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java
@@ -41,6 +41,9 @@ import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
+import org.apache.sling.distribution.journal.impl.discovery.DiscoveryService;
+import org.apache.sling.distribution.journal.impl.discovery.State;
+import org.apache.sling.distribution.journal.impl.discovery.TopologyView;
import org.apache.sling.distribution.journal.shared.JMXRegistration;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index 9f9de96..de4f552 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -35,6 +35,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Dictionary;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -54,8 +55,11 @@ import org.apache.sling.distribution.agent.spi.DistributionAgent;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.impl.discovery.DiscoveryService;
import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.queue.impl.OffsetQueueImpl;
+import org.apache.sling.distribution.journal.queue.impl.PubQueue;
import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
@@ -140,9 +144,6 @@ public class DistributionPublisherTest {
@Mock
private PackageQueuedNotifier queuedNotifier;
- @Mock
- private TopologyView topology;
-
@Captor
private ArgumentCaptor<PackageMessage> pkgCaptor;
@@ -215,39 +216,37 @@ public class DistributionPublisherTest {
@Test
public void testQueueNames() throws DistributionException, IOException {
- when(discoveryService.getTopologyView()).thenReturn(topology);
- when(topology.getSubscribedAgentIds(PUB1AGENT1)).thenReturn(Collections.singleton(QUEUE_NAME));
- State state = stateWithMaxRetries(-1);
- when(topology.getState(QUEUE_NAME, PUB1AGENT1)).thenReturn(state);
+ when(pubQueueProvider.getQueueNames(PUB1AGENT1)).thenReturn(Collections.singleton(QUEUE_NAME));
Iterable<String> names = publisher.getQueueNames();
assertThat(names, contains(QUEUE_NAME));
}
@Test
public void testQueueNamesWithErrorQueue() throws DistributionException, IOException {
- when(discoveryService.getTopologyView()).thenReturn(topology);
- when(topology.getSubscribedAgentIds(PUB1AGENT1)).thenReturn(Collections.singleton(QUEUE_NAME));
- State state = new State(PUB1AGENT1, SUBAGENT1, 0, 1, 0, 1, false);
- when(topology.getState(QUEUE_NAME, PUB1AGENT1)).thenReturn(state);
+ when(pubQueueProvider.getQueueNames(Mockito.eq(PUB1AGENT1)))
+ .thenReturn(new HashSet<>(Arrays.asList(QUEUE_NAME, QUEUE_NAME + "-error")));
Iterable<String> names = publisher.getQueueNames();
assertThat(names, containsInAnyOrder(QUEUE_NAME + "-error", QUEUE_NAME));
}
@Test
public void testGetQueue() throws DistributionException, IOException {
- when(discoveryService.getTopologyView()).thenReturn(topology);
- when(topology.getSubscribedAgentIds(PUB1AGENT1)).thenReturn(Collections.singleton(QUEUE_NAME));
- State state = stateWithMaxRetries(1);
- when(topology.getState(QUEUE_NAME, PUB1AGENT1)).thenReturn(state);
- publisher.getQueue(QUEUE_NAME);
- publisher.getQueue(QUEUE_NAME + "-error");
- // TODO Add assertions
+ when(pubQueueProvider.getQueue(PUB1AGENT1, QUEUE_NAME))
+ .thenReturn(new PubQueue(QUEUE_NAME, new OffsetQueueImpl<>(), 0, null));
+ DistributionQueue queue = publisher.getQueue(QUEUE_NAME);
+ assertThat(queue, notNullValue());
+ }
+
+ @Test
+ public void testGetErrorQueue() throws DistributionException, IOException {
+ when(pubQueueProvider.getQueue(PUB1AGENT1, QUEUE_NAME + "-error"))
+ .thenReturn(new PubQueue(QUEUE_NAME, new OffsetQueueImpl<>(), 0, null));
+ DistributionQueue queue = publisher.getQueue(QUEUE_NAME + "-error");
+ assertThat(queue, notNullValue());
}
@Test
public void testGetWrongQueue() throws DistributionException, IOException {
- when(discoveryService.getTopologyView()).thenReturn(topology);
- when(topology.getSubscribedAgentIds(PUB1AGENT1)).thenReturn(Collections.singleton(QUEUE_NAME));
Counter counter = new TestCounter();
when(distributionMetricsService.getQueueAccessErrorCount()).thenReturn(counter);
@@ -258,12 +257,7 @@ public class DistributionPublisherTest {
@Test
public void testGetQueueErrorMetrics() throws DistributionException, IOException {
- when(discoveryService.getTopologyView()).thenReturn(topology);
- when(topology.getSubscribedAgentIds(PUB1AGENT1)).thenReturn(Collections.singleton(QUEUE_NAME));
- State state = stateWithMaxRetries(1);
- when(topology.getState(QUEUE_NAME, PUB1AGENT1)).thenReturn(state);
- AgentId subAgentId = new AgentId(QUEUE_NAME);
- when(pubQueueProvider.getQueue(PUB1AGENT1, subAgentId.getSlingId(), subAgentId.getAgentName(), QUEUE_NAME, 2, 0, false))
+ when(pubQueueProvider.getQueue(Mockito.any(), Mockito.any()))
.thenThrow(new RuntimeException("Error"));
Counter counter = new TestCounter();
@@ -276,10 +270,6 @@ public class DistributionPublisherTest {
assertEquals("Wrong getQueue error counter",1, counter.getCount());
}
- private State stateWithMaxRetries(int maxRetries) {
- return new State(PUB1AGENT1, SUBAGENT1, 0, 1, 0, maxRetries, false);
- }
-
private PackageMessage mockPackage(DistributionRequest request) throws IOException {
return PackageMessage.builder()
.pkgId("myid")
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallbackTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallbackTest.java
new file mode 100644
index 0000000..befa6e3
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallbackTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.publisher;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.Closeable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.sling.commons.metrics.Counter;
+import org.apache.sling.distribution.journal.FullMessage;
+import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.JournalAvailable;
+import org.apache.sling.distribution.journal.MessageHandler;
+import org.apache.sling.distribution.journal.MessageSender;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.discovery.DiscoveryService;
+import org.apache.sling.distribution.journal.impl.discovery.State;
+import org.apache.sling.distribution.journal.impl.discovery.TopologyView;
+import org.apache.sling.distribution.journal.messages.ClearCommand;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.queue.QueueState;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.Topics;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.Spy;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class MessagingCacheCallbackTest {
+ private static final String SUBAGENT_NAME_1 = "subagent1";
+ private static final long CLEAR_OFFSET = 7;
+ private static final long CURRENT_OFFSET = 1l;
+ private static final int HEAD_RETRIES = 2;
+ private static final int MAX_RETRIES = 3;
+
+ private static final String PUB1AGENT1 = "agent1";
+
+ private static final String SLINGID1 = UUID.randomUUID().toString();
+ private static final String SUBAGENT_ID1 = SLINGID1 +"-" + SUBAGENT_NAME_1;
+
+
+ @Mock
+ private MessagingProvider messagingProvider;
+
+ @Spy
+ private Topics topics;
+
+ @Mock
+ private JournalAvailable journalAvailable;
+
+ @Mock
+ private DistributionMetricsService distributionMetricsService;
+
+ @Mock
+ private MessageHandler<PackageMessage> handler;
+
+ @Mock
+ private MessageSender<Object> sender;
+
+ @Mock
+ private DiscoveryService discovery;
+
+ @Mock
+ private Counter counter;
+
+ @InjectMocks
+ private MessagingCacheCallback callback;
+
+ @Captor
+ private ArgumentCaptor<HandlerAdapter<PackageMessage>> handlerCaptor;
+
+ @Captor
+ private ArgumentCaptor<ClearCommand> clearCommandCaptor;
+
+ @Test
+ public void testCreateConsumer() throws Exception {
+ when(messagingProvider.createSender(Mockito.any())).thenReturn(sender);
+ Closeable poller = callback.createConsumer(handler);
+ assertThat(poller, notNullValue());
+
+ poller.close();
+ }
+
+ @Test
+ public void testFetchRange() throws Exception {
+ when(distributionMetricsService.getQueueCacheFetchCount()).thenReturn(counter);
+ when(messagingProvider.assignTo(Mockito.eq(10l))).thenReturn("0:10");
+ CompletableFuture<List<FullMessage<PackageMessage>>> result = CompletableFuture.supplyAsync(this::fetch);
+ verify(messagingProvider, timeout(100000)).createPoller(
+ Mockito.anyString(),
+ Mockito.eq(Reset.earliest),
+ Mockito.eq("0:10"),
+ handlerCaptor.capture());
+ simulateMessage(19);
+ simulateMessage(20);
+ List<FullMessage<PackageMessage>> messages = result.get(100, TimeUnit.SECONDS);
+ assertThat(messages.size(), equalTo(1));
+ }
+
+ @Test
+ public void testGetSubscribedAgentIds() {
+ TopologyView topology = createTopologyView();
+ when(discovery.getTopologyView()).thenReturn(topology);
+ Set<String> agentIds = callback.getSubscribedAgentIds(PUB1AGENT1);
+ assertThat(agentIds.size(), equalTo(1));
+ assertThat(agentIds.iterator().next(), equalTo(SUBAGENT_ID1));
+ }
+
+ @Test
+ public void testGetQueueState() {
+ TopologyView topology = createTopologyView();
+ when(discovery.getTopologyView()).thenReturn(topology);
+
+ QueueState queueState = callback.getQueueState(PUB1AGENT1, SUBAGENT_ID1);
+
+ assertThat(queueState.getLastProcessedOffset(), equalTo(CURRENT_OFFSET));
+ assertThat(queueState.getHeadRetries(), equalTo(HEAD_RETRIES));
+ assertThat(queueState.getMaxRetries(), equalTo(MAX_RETRIES));
+
+ queueState.getClearCallback().clear(CLEAR_OFFSET);
+
+ verify(sender).accept(clearCommandCaptor.capture());
+ ClearCommand clearCommand = clearCommandCaptor.getValue();
+ assertThat(clearCommand.getOffset(), equalTo(CLEAR_OFFSET));
+ assertThat(clearCommand.getPubAgentName(), equalTo(PUB1AGENT1));
+ assertThat(clearCommand.getSubAgentName(), equalTo(SUBAGENT_NAME_1));
+ assertThat(clearCommand.getSubSlingId(), equalTo(SLINGID1));
+ }
+
+ private TopologyView createTopologyView() {
+ State state = new State(PUB1AGENT1, SUBAGENT_ID1, 0,
+ CURRENT_OFFSET, HEAD_RETRIES, MAX_RETRIES, true);
+ return new TopologyView(Collections.singleton(state));
+ }
+
+ private void simulateMessage(int offset) {
+ FullMessage<PackageMessage> message = RangePollerTest.createMessage(ReqType.ADD, offset);
+ handlerCaptor.getValue().getHandler().handle(message.getInfo(), message.getMessage());
+ }
+
+ private List<FullMessage<PackageMessage>> fetch() {
+ try {
+ return callback.fetchRange(10l, 20l);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException();
+ }
+ }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
index b41029c..60670b5 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
@@ -19,38 +19,49 @@
package org.apache.sling.distribution.journal.impl.publisher;
import static java.util.Arrays.asList;
-import static org.mockito.Matchers.any;
+import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS;
+import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_PATHS;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;
-import static org.mockito.internal.util.reflection.Whitebox.setInternalState;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
+import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
+import org.apache.sling.distribution.journal.impl.discovery.State;
+import org.apache.sling.distribution.journal.impl.discovery.TopologyView;
+import org.apache.sling.distribution.journal.impl.discovery.TopologyViewDiff;
+import org.apache.sling.distribution.journal.messages.PackageDistributedMessage;
+import org.apache.sling.distribution.journal.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.queue.QueueItemFactory;
import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
+import org.mockito.Spy;
import org.osgi.service.event.EventAdmin;
public class PackageDistributedNotifierTest {
@Mock
- private PubQueueCacheService pubQueueCacheService;
+ private PubQueueProvider pubQueueCacheService;
@Mock
private OffsetQueue<DistributionQueueItem> offsetQueue;
- @Mock
+ @Spy
private Topics topics;
@Mock
@@ -59,29 +70,42 @@ public class PackageDistributedNotifierTest {
@Mock
private EventAdmin eventAdmin;
+ @InjectMocks
+ private PackageDistributedNotifier notifier;
+
+ @Mock
+ private MessageSender<Object> sender;
+
+ @Captor
+ private ArgumentCaptor<PackageDistributedMessage> messageCaptor;
+
@Before
public void before() {
initMocks(this);
when(offsetQueue.getItem(anyLong()))
- .thenReturn(new DistributionQueueItem("packageId", Collections.emptyMap()));
+ .thenReturn(createItem());
when(pubQueueCacheService.getOffsetQueue(anyString(), anyLong()))
.thenReturn(offsetQueue);
+ when(messagingProvider.createSender(Mockito.eq(topics.getEventTopic())))
+ .thenReturn(sender);
}
-
@Test
public void testChanged() throws Exception {
- PackageDistributedNotifier notifier = Mockito.spy(new PackageDistributedNotifier());
- setInternalState(notifier, "pubQueueCacheService", pubQueueCacheService);
- setInternalState(notifier, "eventAdmin", eventAdmin);
- setInternalState(notifier, "messagingProvider", messagingProvider);
- setInternalState(notifier, "topics", topics);
notifier.activate();
TopologyViewDiff diffView = new TopologyViewDiff(
buildView(new State("pub1", "sub1", 1000, 10, 0, -1, false)),
buildView(new State("pub1", "sub1", 2000, 13, 0, -1, false)));
notifier.changed(diffView);
- verify(notifier, times(3)).processOffset(anyString(), any(DistributionQueueItem.class));
+ verify(sender, times(3)).accept(messageCaptor.capture());
+ }
+
+ private DistributionQueueItem createItem() {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put(QueueItemFactory.RECORD_OFFSET, 10l);
+ properties.put(PROPERTY_REQUEST_PATHS, new String[] {"/test"});
+ properties.put(PROPERTY_REQUEST_DEEP_PATHS, new String[] {"/test"});
+ return new DistributionQueueItem("packageId", properties);
}
private TopologyView buildView(State ... state) {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisherTest.java
new file mode 100644
index 0000000..00df5e6
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisherTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.publisher;
+
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.queue.PubQueueProviderFactory;
+import org.apache.sling.distribution.journal.shared.Topics;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.Spy;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PubQueueProviderPublisherTest {
+
+ @Mock
+ MessagingProvider messagingProvider;
+
+ @Mock
+ private BundleContext context;
+
+ @Mock
+ PubQueueProviderFactory pubQueueProviderFactory;
+
+ @Spy
+ Topics topics = new Topics();
+
+ @InjectMocks
+ private PubQueueProviderPublisher pubQueueProviderPublisher;
+
+ @Mock
+ private PubQueueProvider pubQueueProvider;
+
+ @Mock
+ private ServiceRegistration<PubQueueProvider> reg;
+
+ @Test
+ public void testCycle() throws IOException {
+ when(pubQueueProviderFactory.create(Mockito.any())).thenReturn(pubQueueProvider);
+ when(context.registerService(Mockito.eq(PubQueueProvider.class), Mockito.eq(pubQueueProvider), Mockito.any()))
+ .thenReturn(reg);
+ pubQueueProviderPublisher.activate(context);
+
+ pubQueueProviderPublisher.deactivate();
+ verify(pubQueueProvider).close();
+ verify(reg).unregister();
+ }
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/QueueCacheSeederTest.java
similarity index 97%
rename from src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/publisher/QueueCacheSeederTest.java
index 29a5133..8a3cf76 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/QueueCacheSeederTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.impl.publisher;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.timeout;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/RangePollerTest.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/publisher/RangePollerTest.java
index 6d82121..217bd59 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/RangePollerTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.impl.publisher;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.contains;
@@ -107,7 +107,7 @@ public class RangePollerTest {
}
}
- private FullMessage<PackageMessage> createMessage(ReqType reqType, int offset) {
+ public static FullMessage<PackageMessage> createMessage(ReqType reqType, long offset) {
MessageInfo info = new TestMessageInfo(TOPIC, 0, offset, System.currentTimeMillis());
PackageMessage message = PackageMessage.builder()
.pubAgentName("agent1")
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueueTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueueTest.java
deleted file mode 100644
index 9fe877a..0000000
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueueTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.impl.queue.impl;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
-import org.apache.sling.distribution.journal.shared.PackageRetries;
-import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-public class SubQueueTest {
-
- @Test
- public void testGetName() throws Exception {
- String queueName = "someQueue";
- SubQueue queue = new SubQueue(queueName, null, new PackageRetries());
- Assert.assertEquals(queueName, queue.getName());
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testAdd() throws Exception {
- SubQueue queue = new SubQueue("someQueue", null, new PackageRetries());
- queue.add(buildQueueItem("package-1"));
- }
-
- @Test
- public void testGetHead() throws Exception {
- SubQueue emptyQueue = new SubQueue("emptyQueue", null, new PackageRetries());
- Assert.assertNull(emptyQueue.getHead());
- SubQueue oneQueue = new SubQueue("oneQueue", buildQueueItem("1"), new PackageRetries());
- Assert.assertNotNull(oneQueue.getHead());
- }
-
- @Test
- public void testGetItems() throws Exception {
- SubQueue oneQueue = new SubQueue("oneQueue", null, new PackageRetries());
- Assert.assertNotNull(oneQueue.getEntries(0, 10));
- SubQueue tenQueue = new SubQueue("tenQueue", buildQueueItem("1"), new PackageRetries());
- Assert.assertEquals(1, Lists.newArrayList(tenQueue.getEntries(0, 10)).size());
- Assert.assertEquals(1, Lists.newArrayList(tenQueue.getEntries(0, -1)).size());
- Assert.assertEquals(0, Lists.newArrayList(tenQueue.getEntries(1, 10)).size());
- }
-
- private DistributionQueueItem buildQueueItem(String packageId) {
- Map<String, Object> properties = new HashMap<>();
- properties.put(QueueItemFactory.RECORD_TOPIC, "topic");
- properties.put(QueueItemFactory.RECORD_OFFSET, 0);
- properties.put(QueueItemFactory.RECORD_PARTITION, 0);
- properties.put(QueueItemFactory.RECORD_TIMESTAMP, System.currentTimeMillis());
- return new DistributionQueueItem(packageId, properties);
- }
-}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index fd4e799..219a561 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -96,6 +96,7 @@ import org.mockito.MockitoAnnotations;
import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.mockito.stubbing.OngoingStubbing;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.event.EventAdmin;
@@ -228,10 +229,10 @@ public class SubscriberTest {
initSubscriber(ImmutableMap.of("agentNames", "dummy"));
assertThat(subscriber.getState(), equalTo(DistributionAgentState.IDLE));
- MessageInfo info = new TestMessageInfo("", 1, 100, 0);
+ MessageInfo info = createInfo(100);
PackageMessage message = BASIC_ADD_PACKAGE;
-
packageHandler.handle(info, message);
+
verify(packageBuilder, timeout(1000).times(0)).installPackage(Mockito.any(ResourceResolver.class),
Mockito.any(ByteArrayInputStream.class));
assertThat(getStoredOffset(), nullValue());
@@ -246,14 +247,13 @@ public class SubscriberTest {
assumeNoPrecondition();
initSubscriber();
assertThat(subscriber.getState(), equalTo(DistributionAgentState.IDLE));
-
- MessageInfo info = new TestMessageInfo("", 1, 0, 0);
- PackageMessage message = BASIC_ADD_PACKAGE;
+
final Semaphore sem = new Semaphore(0);
- when(packageBuilder.installPackage(Mockito.any(ResourceResolver.class),
- Mockito.any(ByteArrayInputStream.class))
- ).thenAnswer(new WaitFor(sem));
+ whenInstallPackage()
+ .thenAnswer(new WaitFor(sem));
+ MessageInfo info = createInfo(0l);
+ PackageMessage message = BASIC_ADD_PACKAGE;
packageHandler.handle(info, message);
waitSubscriber(RUNNING);
@@ -267,15 +267,13 @@ public class SubscriberTest {
public void testReceiveDelete() throws DistributionException, LoginException, PersistenceException {
assumeNoPrecondition();
initSubscriber();
+ final Semaphore sem = new Semaphore(0);
+ whenInstallPackage()
+ .thenAnswer(new WaitFor(sem));
createResource("/test");
- MessageInfo info = new TestMessageInfo("", 1, 0, 0);
+ MessageInfo info = createInfo(0l);
PackageMessage message = BASIC_DEL_PACKAGE;
- final Semaphore sem = new Semaphore(0);
- when(packageBuilder.installPackage(Mockito.any(ResourceResolver.class),
- Mockito.any(ByteArrayInputStream.class))
- ).thenAnswer(new WaitFor(sem));
-
packageHandler.handle(info, message);
waitSubscriber(RUNNING);
@@ -289,13 +287,11 @@ public class SubscriberTest {
public void testSendFailedStatus() throws DistributionException {
assumeNoPrecondition();
initSubscriber(ImmutableMap.of("maxRetries", "1"));
+ whenInstallPackage()
+ .thenThrow(new RuntimeException("Expected"));
- MessageInfo info = new TestMessageInfo("", 1, 0, 0);
+ MessageInfo info = createInfo(0l);
PackageMessage message = BASIC_ADD_PACKAGE;
- when(packageBuilder.installPackage(Mockito.any(ResourceResolver.class),
- Mockito.any(ByteArrayInputStream.class))
- ).thenThrow(new RuntimeException("Expected"));
-
packageHandler.handle(info, message);
verify(statusSender, timeout(10000).times(1)).accept(anyObject());
@@ -307,9 +303,8 @@ public class SubscriberTest {
// Only editable subscriber will send status
initSubscriber(ImmutableMap.of("editable", "true"));
- MessageInfo info = new TestMessageInfo("", 1, 0, 0);
+ MessageInfo info = createInfo(0l);
PackageMessage message = BASIC_ADD_PACKAGE;
-
packageHandler.handle(info, message);
waitSubscriber(IDLE);
@@ -320,9 +315,9 @@ public class SubscriberTest {
public void testSkipBecauseOfPrecondition() throws DistributionException, InterruptedException, TimeoutException {
when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong())).thenReturn(Decision.SKIP);
initSubscriber(ImmutableMap.of("editable", "true"));
- MessageInfo info = new TestMessageInfo("", 1, 11, 0);
- PackageMessage message = BASIC_ADD_PACKAGE;
+ MessageInfo info = createInfo(11l);
+ PackageMessage message = BASIC_ADD_PACKAGE;
packageHandler.handle(info, message);
await().until(this::getStatus, equalTo(PackageStatusMessage.Status.REMOVED));
@@ -333,12 +328,13 @@ public class SubscriberTest {
public void testPreconditionTimeoutExceptionBecauseOfShutdown() throws DistributionException, InterruptedException, TimeoutException, IOException {
when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong())).thenReturn(Decision.WAIT);
initSubscriber(ImmutableMap.of("editable", "true"));
- MessageInfo info = new TestMessageInfo("", 1, 11, 0);
- PackageMessage message = BASIC_ADD_PACKAGE;
-
long startedAt = System.currentTimeMillis();
+
+ MessageInfo info = createInfo(11l);
+ PackageMessage message = BASIC_ADD_PACKAGE;
packageHandler.handle(info, message);
subscriber.deactivate();
+
assertThat("After deactivate precondition should time out quickly.", System.currentTimeMillis() - startedAt, lessThan(1000l));
}
@@ -347,15 +343,24 @@ public class SubscriberTest {
Semaphore sem = new Semaphore(0);
assumeWaitingForPrecondition(sem);
initSubscriber();
- MessageInfo info = new TestMessageInfo("", 1, 0, 0);
- PackageMessage message = BASIC_ADD_PACKAGE;
+ MessageInfo info = createInfo(0l);
+ PackageMessage message = BASIC_ADD_PACKAGE;
packageHandler.handle(info, message);
+
waitSubscriber(RUNNING);
await("Should report ready").until(() -> subscriberReadyStore.getReadyHolder(SUB1_AGENT_NAME).get());
sem.release();
}
+ private OngoingStubbing<DistributionPackageInfo> whenInstallPackage() throws DistributionException {
+ return when(packageBuilder.installPackage(Mockito.any(ResourceResolver.class), Mockito.any(ByteArrayInputStream.class)));
+ }
+
+ private TestMessageInfo createInfo(long offset) {
+ return new TestMessageInfo("", 1, offset, 0);
+ }
+
private Long getStoredOffset() {
LocalStore store = new LocalStore(resolverFactory, BookKeeper.STORE_TYPE_PACKAGE, SUB1_AGENT_NAME);
return store.load(BookKeeper.KEY_OFFSET, Long.class);
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java b/src/test/java/org/apache/sling/distribution/journal/queue/QueueItemFactoryTest.java
similarity index 87%
rename from src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java
rename to src/test/java/org/apache/sling/distribution/journal/queue/QueueItemFactoryTest.java
index d494392..b12b494 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/queue/QueueItemFactoryTest.java
@@ -16,12 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue;
+package org.apache.sling.distribution.journal.queue;
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_OFFSET;
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_PARTITION;
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TIMESTAMP;
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TOPIC;
+import static org.apache.sling.distribution.journal.queue.QueueItemFactory.RECORD_OFFSET;
+import static org.apache.sling.distribution.journal.queue.QueueItemFactory.RECORD_PARTITION;
+import static org.apache.sling.distribution.journal.queue.QueueItemFactory.RECORD_TIMESTAMP;
+import static org.apache.sling.distribution.journal.queue.QueueItemFactory.RECORD_TOPIC;
import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_PACKAGE_TYPE;
import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS;
import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_PATHS;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtilTest.java b/src/test/java/org/apache/sling/distribution/journal/queue/impl/EntryUtilTest.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtilTest.java
rename to src/test/java/org/apache/sling/distribution/journal/queue/impl/EntryUtilTest.java
index 6bc7ede..b884a79 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtilTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/queue/impl/EntryUtilTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.queue.impl;
import static org.junit.Assert.assertEquals;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplJMXTest.java b/src/test/java/org/apache/sling/distribution/journal/queue/impl/OffsetQueueImplJMXTest.java
similarity index 95%
rename from src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplJMXTest.java
rename to src/test/java/org/apache/sling/distribution/journal/queue/impl/OffsetQueueImplJMXTest.java
index 353da42..6c07b0d 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplJMXTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/queue/impl/OffsetQueueImplJMXTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.queue.impl;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
@@ -33,7 +33,7 @@ import javax.management.ObjectInstance;
import javax.management.ObjectName;
import javax.management.ReflectionException;
-import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.queue.OffsetQueue;
import org.apache.sling.distribution.journal.shared.JMXRegistration;
import org.junit.Test;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplTest.java b/src/test/java/org/apache/sling/distribution/journal/queue/impl/OffsetQueueImplTest.java
similarity index 97%
rename from src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplTest.java
rename to src/test/java/org/apache/sling/distribution/journal/queue/impl/OffsetQueueImplTest.java
index 4656e51..ebfedf5 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/queue/impl/OffsetQueueImplTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.queue.impl;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
@@ -24,7 +24,7 @@ import static org.junit.Assert.assertThat;
import java.util.Arrays;
import java.util.Iterator;
-import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.queue.OffsetQueue;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCacheTest.java
similarity index 64%
rename from src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
rename to src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCacheTest.java
index aca7d01..266d54d 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCacheTest.java
@@ -16,43 +16,36 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.queue.impl;
import static java.lang.System.currentTimeMillis;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.stream.LongStream;
import org.apache.sling.commons.metrics.Counter;
-import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.MessageHandler;
-import org.apache.sling.distribution.journal.MessageSender;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.LocalStore;
+import org.apache.sling.distribution.journal.queue.CacheCallback;
+import org.apache.sling.distribution.journal.queue.OffsetQueue;
import org.apache.sling.distribution.journal.shared.TestMessageInfo;
import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -82,35 +75,20 @@ public class PubQueueCacheTest {
private static final Random RAND = new Random();
@Captor
- private ArgumentCaptor<HandlerAdapter<PackageMessage>> handlerCaptor;
+ private ArgumentCaptor<MessageHandler<PackageMessage>> handlerCaptor;
@Mock
private EventAdmin eventAdmin;
@Mock
- private MessagingProvider clientProvider;
-
- @Mock
- private QueueCacheSeeder cacheSeeder;
-
- @Mock
- private DistributionMetricsService distributionMetricsService;
+ private CacheCallback callback;
@Mock
private Counter counter;
@Mock
- private LocalStore seedStore;
-
- @Mock
private Closeable poller;
- @Mock
- private MessageSender<Object> sender;
-
- @Mock
- private QueueCacheSeeder seeder;
-
private PubQueueCache cache;
private ExecutorService executor;
@@ -120,29 +98,12 @@ public class PubQueueCacheTest {
@Before
public void before() {
- when(clientProvider.createPoller(
- eq(TOPIC),
- eq(Reset.latest),
- handlerCaptor.capture()))
+ when(callback.createConsumer(handlerCaptor.capture()))
.thenReturn(poller);
- when(clientProvider.createPoller(
- eq(TOPIC),
- eq(Reset.earliest),
- Mockito.anyString(),
- handlerCaptor.capture()))
- .thenReturn(poller);
- when(clientProvider.createSender(Mockito.anyString()))
- .thenReturn(sender);
-
- when(distributionMetricsService.getQueueCacheFetchCount())
- .thenReturn(counter);
- when(seedStore.load(anyString(), any())).thenReturn(0L);
-
- cache = new PubQueueCache(clientProvider, eventAdmin, distributionMetricsService, TOPIC, seeder);
- verify(seeder).startSeeding();
+ cache = new PubQueueCache(eventAdmin, callback);
executor = Executors.newFixedThreadPool(10);
- tailHandler = handlerCaptor.getValue().getHandler();
+ tailHandler = handlerCaptor.getValue();
}
@After
@@ -165,21 +126,13 @@ public class PubQueueCacheTest {
@Test
public void testFetchWithSingleConsumer() throws Exception {
- simulateMessage(tailHandler, 200);
+ simulateMessage(tailHandler, 200l);
+ when(callback.fetchRange(Mockito.eq(100l), Mockito.eq(200l)))
+ .thenReturn(Arrays.asList(createTestMessage(100, PUB_AGENT_NAME_1, ReqType.ADD)));
Future<OffsetQueue<DistributionQueueItem>> consumer = consumer(PUB_AGENT_NAME_1, 100);
- // seeding the cache with a message at offset 200
- // wait that the consumer has started fetching the offsets from 100 to 200
- MessageHandler<PackageMessage> headHandler = awaitHeadHandler();
- // simulate messages for the fetched offsets
- simulateMessages(headHandler, 100, cache.getMinOffset());
// the consumer returns the offset queue
consumer.get(15, SECONDS);
- assertEquals(100, cache.getMinOffset());
- }
-
- private MessageHandler<PackageMessage> awaitHeadHandler() {
- return Awaitility.await().ignoreExceptions()
- .until(() -> handlerCaptor.getAllValues().get(1).getHandler(), notNullValue());
+ assertEquals(100l, cache.getMinOffset());
}
@Test
@@ -188,18 +141,15 @@ public class PubQueueCacheTest {
// build two consumers for same agent queue, from offset 100
Future<OffsetQueue<DistributionQueueItem>> consumer1 = consumer(PUB_AGENT_NAME_1, 100);
Future<OffsetQueue<DistributionQueueItem>> consumer2 = consumer(PUB_AGENT_NAME_1, 100);
- // seeding the cache with a message at offset 200
- // wait that one consumer has started fetching the offsets from 100 to 200
- MessageHandler<PackageMessage> headHandler = awaitHeadHandler();
- // simulate messages for the fetched offsets
- simulateMessages(headHandler, 100, cache.getMinOffset());
- // both consumers returns the offset queue
+ when(callback.fetchRange(Mockito.eq(100l), Mockito.eq(200l)))
+ .thenReturn(Arrays.asList(createTestMessage(100, PUB_AGENT_NAME_1, ReqType.ADD)));
OffsetQueue<DistributionQueueItem> q1 = consumer1.get(5, SECONDS);
OffsetQueue<DistributionQueueItem> q2 = consumer2.get(5, SECONDS);
assertEquals(q1.getSize(), q2.getSize());
assertEquals(100, cache.getMinOffset());
- // the offsets have been fetched only once
- assertEquals(2, handlerCaptor.getAllValues().size());
+
+ // Fetch should only happen once
+ verify(callback, times(1)).fetchRange(Mockito.anyLong(), Mockito.anyLong());
}
@Test
@@ -213,11 +163,6 @@ public class PubQueueCacheTest {
assertEquals(4, cache.size());
}
- private void simulateMessages(MessageHandler<PackageMessage> handler, long fromOffset, long toOffset) {
- LongStream.rangeClosed(fromOffset, toOffset)
- .forEach(offset -> simulateMessage(handler, offset));
- }
-
private void simulateMessage(MessageHandler<PackageMessage> handler, long offset) {
simulateMessage(handler,
pickAny(PUB_AGENT_NAME_1, PUB_AGENT_NAME_2, PUB_AGENT_NAME_3),
@@ -225,19 +170,31 @@ public class PubQueueCacheTest {
}
private void simulateMessage(MessageHandler<PackageMessage> handler, String pubAgentName, ReqType reqType, long offset) {
- PackageMessage msg = PackageMessage.builder()
+ PackageMessage msg = createTestMessage(pubAgentName, reqType);
+ simulateMessage(handler, msg, offset);
+ }
+
+ private void simulateMessage(MessageHandler<PackageMessage> handler, PackageMessage msg, long offset) {
+ log.info("Simulate msg @ offset {}", offset);
+ handler.handle(createInfo(offset), msg);
+ }
+
+ private FullMessage<PackageMessage> createTestMessage(long offset, String pubAgentName, ReqType reqType) {
+ return new FullMessage<>(createInfo(offset), createTestMessage(pubAgentName, reqType));
+ }
+
+ private PackageMessage createTestMessage(String pubAgentName, ReqType reqType) {
+ return PackageMessage.builder()
.pkgType("pkgType")
.pkgId(UUID.randomUUID().toString())
.pubSlingId("pubSlingId")
.reqType(reqType)
.pubAgentName(pubAgentName)
.build();
- simulateMessage(handler, msg, offset);
}
- private void simulateMessage(MessageHandler<PackageMessage> handler, PackageMessage msg, long offset) {
- log.info("Simulate msg @ offset {}", offset);
- handler.handle(new TestMessageInfo(TOPIC, 0, offset, currentTimeMillis()), msg);
+ private MessageInfo createInfo(long offset) {
+ return new TestMessageInfo(TOPIC, 0, offset, currentTimeMillis());
}
Future<OffsetQueue<DistributionQueueItem>> consumer(String pubAgentName, long minOffset) {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java
similarity index 74%
rename from src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
rename to src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java
index da13e6b..f462f56 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java
@@ -16,10 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.queue.impl;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -29,6 +30,7 @@ import java.lang.management.ManagementFactory;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Set;
+import java.util.UUID;
import javax.management.AttributeNotFoundException;
import javax.management.InstanceNotFoundException;
@@ -49,6 +51,8 @@ import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.journal.queue.CacheCallback;
+import org.apache.sling.distribution.journal.queue.QueueState;
import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
@@ -61,13 +65,14 @@ import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
+import org.osgi.framework.BundleContext;
import org.osgi.service.event.EventAdmin;
public class PubQueueProviderTest {
private static final String PUB1_AGENT_NAME = "pub1";
private static final String PUB2_AGENT_NAME = "pub2";
- private static final String SUB_SLING_ID = "sub1sling";
+ private static final String SUB_SLING_ID = UUID.randomUUID().toString();
private static final String SUB_AGENT_NAME = "sub1";
private static final String SUB_AGENT_ID = SUB_SLING_ID +"-" + SUB_AGENT_NAME;
@@ -76,7 +81,7 @@ public class PubQueueProviderTest {
private MessagingProvider clientProvider;
@Captor
- private ArgumentCaptor<HandlerAdapter<PackageMessage>> handlerCaptor;
+ private ArgumentCaptor<MessageHandler<PackageMessage>> handlerCaptor;
@Captor
private ArgumentCaptor<HandlerAdapter<PackageStatusMessage>> statHandlerCaptor;
@@ -93,10 +98,13 @@ public class PubQueueProviderTest {
@Mock
private MessageSender<Object> sender;
- private PubQueueCacheService pubQueueCacheService;
+ @Mock
+ private CacheCallback callback;
+
+ @Mock
+ private BundleContext context;
private MessageHandler<PackageMessage> handler;
- private MessageHandler<PackageStatusMessage> statHandler;
private PubQueueProviderImpl queueProvider;
private MBeanServer mbeanServer;
@@ -104,49 +112,43 @@ public class PubQueueProviderTest {
@Before
public void before() throws PersistenceException {
MockitoAnnotations.initMocks(this);
- when(clientProvider.createPoller(
- Mockito.eq(Topics.PACKAGE_TOPIC),
- Mockito.any(Reset.class),
- handlerCaptor.capture()))
- .thenReturn(poller);
+ when(callback.createConsumer(handlerCaptor.capture()))
+ .thenReturn(poller);
when(clientProvider.createPoller(
Mockito.eq(Topics.STATUS_TOPIC),
Mockito.any(Reset.class),
statHandlerCaptor.capture()))
.thenReturn(statPoller);
- when(clientProvider.createSender(Mockito.anyString()))
- .thenReturn(sender);
- Topics topics = new Topics();
- pubQueueCacheService = new PubQueueCacheService(clientProvider, topics, eventAdmin);
- pubQueueCacheService.activate();
- queueProvider = new PubQueueProviderImpl(pubQueueCacheService, clientProvider, topics);
- queueProvider.activate();
- handler = handlerCaptor.getValue().getHandler();
- statHandler = statHandlerCaptor.getValue().getHandler();
+ queueProvider = new PubQueueProviderImpl(eventAdmin, callback, context);
+ handler = handlerCaptor.getValue();
}
@After
public void after() throws IOException {
- pubQueueCacheService.deactivate();
- queueProvider.deactivate();
- verify(poller).close();
- verify(statPoller).close();
+ queueProvider.close();
+ verify(poller, atLeast(1)).close();
}
@Test
public void test() throws Exception {
- handler.handle(info(0L), packageMessage("packageid1", PUB1_AGENT_NAME));
- handler.handle(info(1L), packageMessage("packageid2", PUB2_AGENT_NAME));
- handler.handle(info(2L), packageMessage("packageid3", PUB1_AGENT_NAME));
+ handler.handle(info(1L), packageMessage("packageid1", PUB1_AGENT_NAME));
+ handler.handle(info(2L), packageMessage("packageid2", PUB2_AGENT_NAME));
+ handler.handle(info(3L), packageMessage("packageid3", PUB1_AGENT_NAME));
+
+ when(callback.getQueueState(Mockito.eq(PUB1_AGENT_NAME), Mockito.any()))
+ .thenReturn(new QueueState(0, -1, 0, null));
// Full pub1 queue contains all packages from pub1
- DistributionQueue queue = queueProvider.getQueue(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID, 0, -1, false);
+ DistributionQueue queue = queueProvider.getQueue(PUB1_AGENT_NAME, SUB_AGENT_ID);
Iterator<DistributionQueueEntry> it1 = queue.getEntries(0, -1).iterator();
assertThat(it1.next().getItem().getPackageId(), equalTo("packageid1"));
assertThat(it1.next().getItem().getPackageId(), equalTo("packageid3"));
// With offset 1 first package is removed
- DistributionQueue queue2 = queueProvider.getQueue(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID, 1, -1, false);
+ when(callback.getQueueState(Mockito.eq(PUB1_AGENT_NAME), Mockito.any()))
+ .thenReturn(new QueueState(1, -1, 0, null));
+
+ DistributionQueue queue2 = queueProvider.getQueue(PUB1_AGENT_NAME, SUB_AGENT_ID);
Iterator<DistributionQueueEntry> it2 = queue2.getEntries(0, 20).iterator();
assertThat(it2.next().getItem().getPackageId(), equalTo("packageid3"));
assertThat(it2.hasNext(), equalTo(false));
@@ -155,13 +157,13 @@ public class PubQueueProviderTest {
Set<ObjectInstance> mbeans = mbeanServer.queryMBeans(new ObjectName("org.apache.sling.distribution:type=OffsetQueue,id="+PUB1_AGENT_NAME), null);
ObjectInstance mbean = mbeans.iterator().next();
assertThat(getAttrib(mbean, "Size"), equalTo(2));
- assertThat(getAttrib(mbean, "HeadOffset"), equalTo(0L));
- assertThat(getAttrib(mbean, "TailOffset"), equalTo(2L));
+ assertThat(getAttrib(mbean, "HeadOffset"), equalTo(1L));
+ assertThat(getAttrib(mbean, "TailOffset"), equalTo(3L));
}
@Test
public void testEmptyErrorQueue() throws Exception {
- DistributionQueue queue = queueProvider.getErrorQueue(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID);
+ DistributionQueue queue = queueProvider.getQueue(PUB1_AGENT_NAME, SUB_AGENT_ID + "-error");
assertThat(queue.getStatus().getItemsCount(), equalTo(0));
}
@@ -174,14 +176,36 @@ public class PubQueueProviderTest {
MessageInfo info = info(1L);
handler.handle(info, pkgMsg1);
PackageStatusMessage statusMsg1 = statusMessage(info.getOffset(), pkgMsg1);
- statHandler.handle(info, statusMsg1);
+ queueProvider.handleStatus(info, statusMsg1);
- DistributionQueue queue = queueProvider.getErrorQueue(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID);
+ DistributionQueue queue = queueProvider.getQueue(PUB1_AGENT_NAME, SUB_AGENT_ID + "-error");
assertThat(queue.getStatus().getItemsCount(), equalTo(1));
DistributionQueueEntry head = queue.getHead();
DistributionQueueItem item = head.getItem();
assertThat(item.getPackageId(), equalTo("packageid1"));
}
+
+ @Test
+ public void testCleanUp() {
+ handler.handle(info(0L), packageMessage("packageid1", PUB1_AGENT_NAME));
+
+ assertThat(queueSize(), equalTo(1));
+ queueProvider.run();
+ assertThat(queueSize(), equalTo(1));
+
+ for (long c=0; c<10001;c++) {
+ handler.handle(info(c), packageMessage("packageid" + c, PUB1_AGENT_NAME));
+ }
+ assertThat(queueSize(), equalTo(10001));
+ queueProvider.run();
+ handler = handlerCaptor.getValue();
+ handler.handle(info(0L), packageMessage("packageid1", PUB1_AGENT_NAME));
+ assertThat(queueSize(), equalTo(1));
+ }
+
+ private int queueSize() {
+ return queueProvider.getOffsetQueue(PUB1_AGENT_NAME, 0).getSize();
+ }
private MessageInfo info(long offset) {
MessageInfo info = Mockito.mock(MessageInfo.class);
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java
similarity index 93%
rename from src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java
rename to src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java
index 28b5320..8ae13b4 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java
@@ -16,12 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.queue.impl;
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_OFFSET;
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_PARTITION;
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TIMESTAMP;
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TOPIC;
+import static org.apache.sling.distribution.journal.queue.QueueItemFactory.RECORD_OFFSET;
+import static org.apache.sling.distribution.journal.queue.QueueItemFactory.RECORD_PARTITION;
+import static org.apache.sling.distribution.journal.queue.QueueItemFactory.RECORD_TIMESTAMP;
+import static org.apache.sling.distribution.journal.queue.QueueItemFactory.RECORD_TOPIC;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -36,7 +36,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
-import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.queue.OffsetQueue;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.DistributionQueueType;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/AgentIdTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/AgentIdTest.java
similarity index 98%
rename from src/test/java/org/apache/sling/distribution/journal/impl/publisher/AgentIdTest.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/AgentIdTest.java
index f7a4fb8..6822ba0 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/AgentIdTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/AgentIdTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.publisher;
+package org.apache.sling.distribution.journal.shared;
import static org.junit.Assert.assertEquals;