You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2020/04/07 12:51:16 UTC
[sling-org-apache-sling-distribution-journal] 06/06: Revert
"SLING-9259 - Extract service subscriber code into separate package (#25)"
This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit 03d8900203f71ba55759a4523c79587d65c60062
Author: tmaret <tm...@adobe.com>
AuthorDate: Tue Apr 7 14:41:28 2020 +0200
Revert "SLING-9259 - Extract service subscriber code into separate package (#25)"
This reverts commit 9f2175a3
---
.../impl/precondition/DefaultPrecondition.java | 4 +-
.../impl/precondition/PackageStatusWatcher.java | 2 +-
.../journal/impl/precondition/Precondition.java | 3 +-
.../impl/precondition/StagingPrecondition.java | 32 ++-
.../journal/impl/publisher/DiscoveryService.java | 2 +-
.../impl/publisher/DistributionPublisher.java | 15 +-
.../impl/publisher/PackageDistributedNotifier.java | 2 +-
.../journal/impl/publisher/PackageRepo.java | 4 +-
.../{shared => impl/queue/impl}/EntryUtil.java | 2 +-
.../queue/impl}/PackageRetries.java | 4 +-
.../journal/impl/queue/impl/PubErrQueue.java | 2 -
.../journal/impl/queue/impl/PubQueue.java | 2 -
.../journal/impl/queue/impl/PubQueueCache.java | 8 +-
.../impl/queue/impl/PubQueueCacheService.java | 4 +-
.../impl/queue/impl/PubQueueProviderImpl.java | 2 +-
.../queue/impl}/QueueEntryFactory.java | 2 +-
.../queue/impl/{PubErrQueue.java => SubQueue.java} | 103 +++++---
.../journal/{ => impl}/shared/AgentState.java | 2 +-
.../{ => impl}/shared/DefaultDistributionLog.java | 2 +-
.../shared/DistributionMetricsService.java | 114 ++++++++-
.../{ => impl}/shared/ExponentialBackOff.java | 2 +-
.../journal/{ => impl}/shared/JMXRegistration.java | 2 +-
.../{ => impl}/shared/JournalAvailableChecker.java | 4 +-
.../shared/JournalAvailableServiceMarker.java | 2 +-
.../journal/{ => impl}/shared/LimitPoller.java | 2 +-
.../journal/{ => impl}/shared/PackageBrowser.java | 2 +-
.../{ => impl}/shared/PackageViewerPlugin.java | 2 +-
.../shared/SimpleDistributionResponse.java | 2 +-
.../journal/{ => impl}/shared/Topics.java | 2 +-
.../journal/impl/subscriber/Announcer.java | 1 -
.../{service => impl}/subscriber/BookKeeper.java | 85 +++----
.../journal/impl/subscriber/CommandPoller.java | 2 +-
.../subscriber/ContentPackageExtractor.java | 4 +-
.../impl/subscriber/DistributionSubscriber.java | 269 ++++++++++++++-------
.../{service => impl}/subscriber/LocalStore.java | 4 +-
.../subscriber/PackageHandler.java | 45 +---
.../subscriber/PackageHandling.java | 2 +-
.../subscriber/PreConditionTimeoutException.java | 28 ---
.../impl/subscriber/SubscriberConfiguration.java | 1 -
.../subscriber/SubscriberIdle.java | 9 +-
.../service/subscriber/BookKeeperFactory.java | 90 -------
.../service/subscriber/ImportedEventFactory.java | 55 -----
.../journal/service/subscriber/NoopMetric.java | 99 --------
.../service/subscriber/SubscriberMetrics.java | 239 ------------------
.../impl/precondition/DefaultPreconditionTest.java | 5 +-
.../precondition/PackageStatusWatcherTest.java | 4 +-
.../impl/precondition/StagingPreconditionTest.java | 56 +++--
.../impl/publisher/DiscoveryServiceTest.java | 4 +-
.../impl/publisher/DistPublisherJMXTest.java | 3 +-
.../impl/publisher/DistributionPublisherTest.java | 4 +-
.../publisher/PackageDistributedNotifierTest.java | 2 +-
.../journal/impl/publisher/PackageRepoTest.java | 4 +-
.../journal/impl/queue/QueueItemFactoryTest.java | 2 +-
.../journal/impl/queue/impl/EntryUtilTest.java | 1 -
.../impl/queue/impl/OffsetQueueImplJMXTest.java | 2 +-
.../queue/impl}/PackageRetriesTest.java | 4 +-
.../journal/impl/queue/impl/PubQueueCacheTest.java | 4 +-
.../impl/queue/impl/PubQueueProviderTest.java | 2 +-
.../journal/impl/queue/impl/PubQueueTest.java | 1 -
.../journal/impl/queue/impl/RangePollerTest.java | 2 +-
.../journal/impl/queue/impl/SubQueueTest.java | 72 ++++++
.../shared/DistributionMetricsServiceTest.java | 14 +-
.../{ => impl}/shared/ExponentialBackoffTest.java | 3 +-
.../shared/JournalAvailableCheckerTest.java | 11 +-
.../journal/{ => impl}/shared/LimitPollerTest.java | 4 +-
.../{ => impl}/shared/PackageBrowserTest.java | 3 +-
.../{ => impl}/shared/PackageViewerPluginTest.java | 5 +-
.../shared/SimpleDistributionResponseTest.java | 3 +-
.../journal/{ => impl}/shared/TestMessageInfo.java | 2 +-
.../journal/impl/subscriber/AnnouncerTest.java | 2 -
.../subscriber/BookKeeperTest.java | 46 +---
.../journal/impl/subscriber/CommandPollerTest.java | 4 +-
.../subscriber/ContentPackageExtractorTest.java | 3 +-
.../subscriber/LocalStoreTest.java | 3 +-
.../subscriber/SubscriberIdleTest.java | 3 +-
.../journal/impl/subscriber/SubscriberTest.java | 170 +++++++++----
76 files changed, 760 insertions(+), 952 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
index 5144378..dceed02 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
@@ -23,7 +23,7 @@ import org.osgi.service.component.annotations.Component;
@Component(immediate = true, service = Precondition.class, property = { "name=default" })
public class DefaultPrecondition implements Precondition {
@Override
- public Decision canProcess(String subAgentName, long pkgOffset) {
- return Decision.ACCEPT;
+ public boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) {
+ return true;
}
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
index 506cf55..0408e0d 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
@@ -29,10 +29,10 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
-import org.apache.sling.distribution.journal.shared.Topics;
public class PackageStatusWatcher implements Closeable {
private final Closeable poller;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
index d934431..3730475 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
@@ -33,7 +33,6 @@ public interface Precondition {
* @throws InterruptedException if the thread was interrupted and should shut down
* @return true if the package can be processed; otherwise it returns false.
*/
- Decision canProcess(String subAgentName, long pkgOffset);
+ boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws InterruptedException, TimeoutException;
- enum Decision { ACCEPT, SKIP, WAIT};
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
index ad6be9c..2272888 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
@@ -21,10 +21,12 @@ package org.apache.sling.distribution.journal.impl.precondition;
import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
+import java.util.concurrent.TimeoutException;
+
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
-import org.apache.sling.distribution.journal.shared.Topics;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
@@ -54,6 +56,8 @@ public class StagingPrecondition implements Precondition, Runnable {
private volatile PackageStatusWatcher watcher;
+ private volatile boolean running = true;
+
@Activate
public void activate() {
watcher = new PackageStatusWatcher(messagingProvider, topics);
@@ -63,15 +67,31 @@ public class StagingPrecondition implements Precondition, Runnable {
@Deactivate
public synchronized void deactivate() {
IOUtils.closeQuietly(watcher);
+ running = false;
}
@Override
- public Decision canProcess(String subAgentName, long pkgOffset) {
- Status status = getStatus(subAgentName, pkgOffset);
- if (status == null) {
- return Decision.WAIT;
+ public boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws InterruptedException, TimeoutException {
+ if (timeoutSeconds < 1) {
+ throw new IllegalArgumentException();
+ }
+
+ // try to get the status for timeoutSeconds and then throw
+ for(int i=0; i < timeoutSeconds * 10; i++) {
+ Status status = getStatus(subAgentName, pkgOffset);
+
+ if (status != null) {
+ return status == Status.IMPORTED;
+ } else {
+ Thread.sleep(100);
+ }
+
+ if (!running) {
+ throw new InterruptedException("Staging precondition is shutting down");
+ }
}
- return status == Status.IMPORTED ? Decision.ACCEPT : Decision.SKIP;
+
+ throw new TimeoutException("Timeout waiting for package offset " + pkgOffset + " on status topic.");
}
private synchronized Status getStatus(String subAgentName, long pkgOffset) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
index c201a42..af052a3 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
@@ -28,9 +28,9 @@ import java.util.Hashtable;
import javax.annotation.ParametersAreNonnullByDefault;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
-import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.commons.io.IOUtils;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index c1c75d0..55787dc 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -20,12 +20,12 @@ package org.apache.sling.distribution.journal.impl.publisher;
import static java.util.stream.StreamSupport.stream;
+import static org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.timed;
import static java.util.Objects.requireNonNull;
import static org.apache.sling.distribution.DistributionRequestState.ACCEPTED;
import static org.apache.sling.distribution.DistributionRequestType.ADD;
import static org.apache.sling.distribution.DistributionRequestType.DELETE;
import static org.apache.sling.distribution.DistributionRequestType.TEST;
-import static org.apache.sling.distribution.journal.shared.DistributionMetricsService.timed;
import java.util.Arrays;
import java.util.Collections;
@@ -47,6 +47,11 @@ import javax.management.NotCompliantMBeanException;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.impl.shared.AgentState;
+import org.apache.sling.distribution.journal.impl.shared.DefaultDistributionLog;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.commons.lang3.StringUtils;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.distribution.DistributionRequest;
@@ -68,13 +73,9 @@ import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.metatype.annotations.Designate;
+
+import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import org.apache.sling.distribution.journal.shared.AgentState;
-import org.apache.sling.distribution.journal.shared.DefaultDistributionLog;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.JMXRegistration;
-import org.apache.sling.distribution.journal.shared.SimpleDistributionResponse;
-import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.JournalAvailable;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
index e3dace3..591a0cf 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
@@ -29,8 +29,8 @@ import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
import org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.PackageDistributedMessage;
-import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.JsonMessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
index 81a4fba..aee0c52 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
@@ -25,6 +25,8 @@ import javax.jcr.Node;
import javax.jcr.Property;
import javax.jcr.nodetype.NodeType;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.jackrabbit.api.ReferenceBinary;
import org.apache.jackrabbit.commons.JcrUtils;
import org.apache.sling.api.resource.LoginException;
@@ -45,8 +47,6 @@ import org.slf4j.LoggerFactory;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.Topics;
import static java.util.Collections.singletonMap;
import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/EntryUtil.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java
similarity index 96%
rename from src/main/java/org/apache/sling/distribution/journal/shared/EntryUtil.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java
index ed2d461..529fc52 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/EntryUtil.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.queue.impl;
import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
import org.apache.sling.distribution.queue.DistributionQueueItem;
diff --git a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetries.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetries.java
similarity index 94%
rename from src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetries.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetries.java
index 1466d85..23f9463 100644
--- a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetries.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetries.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.service.subscriber;
+package org.apache.sling.distribution.journal.impl.queue.impl;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -27,7 +27,7 @@ import javax.annotation.ParametersAreNonnullByDefault;
* Holds package retries by agent name
*/
@ParametersAreNonnullByDefault
-class PackageRetries {
+public class PackageRetries {
// (pubAgentName x retries)
private final Map<String, Integer> pubAgentNameToRetries = new ConcurrentHashMap<>();
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
index db27714..aaa7b1c 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
@@ -26,8 +26,6 @@ import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.shared.EntryUtil;
-import org.apache.sling.distribution.journal.shared.QueueEntryFactory;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.DistributionQueueState;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
index 197d0b1..9bfd4fd 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
@@ -40,8 +40,6 @@ import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.shared.EntryUtil;
-import org.apache.sling.distribution.journal.shared.QueueEntryFactory;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.DistributionQueueItemState;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index b794116..189513c 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -41,6 +41,8 @@ import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.osgi.service.event.Event;
@@ -51,8 +53,6 @@ import org.slf4j.LoggerFactory;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.JMXRegistration;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessageSender;
@@ -176,9 +176,7 @@ public class PubQueueCache {
sender.send(topic, pkgMsg);
sleep(seedingDelayMs);
} catch (MessagingException e) {
- if (!(e.getCause() instanceof InterruptedException)) {
- LOG.warn(e.getMessage(), e);
- }
+ LOG.warn(e.getMessage(), e);
sleep(seedingDelayMs * 10);
}
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
index 5709493..c5eeb8c 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
@@ -22,9 +22,9 @@ import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.JournalAvailable;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
index a72a358..cc9e839 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
@@ -30,11 +30,11 @@ import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
-import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/QueueEntryFactory.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/shared/QueueEntryFactory.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java
index 58a0717..47a9f25 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/QueueEntryFactory.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.queue.impl;
import static org.apache.sling.distribution.queue.DistributionQueueItemState.ERROR;
import static org.apache.sling.distribution.queue.DistributionQueueItemState.QUEUED;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java
similarity index 50%
copy from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
copy to src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java
index db27714..5d34e6d 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java
@@ -18,46 +18,56 @@
*/
package org.apache.sling.distribution.journal.impl.queue.impl;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.Set;
+import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
-import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.shared.EntryUtil;
-import org.apache.sling.distribution.journal.shared.QueueEntryFactory;
+import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.queue.DistributionQueueItemState;
import org.apache.sling.distribution.queue.DistributionQueueState;
import org.apache.sling.distribution.queue.DistributionQueueStatus;
import org.apache.sling.distribution.queue.DistributionQueueType;
-import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static java.util.Objects.requireNonNull;
+import static org.apache.sling.distribution.queue.DistributionQueueItemState.QUEUED;
+import static org.apache.sling.distribution.queue.DistributionQueueState.BLOCKED;
+import static org.apache.sling.distribution.queue.DistributionQueueState.IDLE;
+import static org.apache.sling.distribution.queue.DistributionQueueState.RUNNING;
+import static org.apache.sling.distribution.queue.DistributionQueueType.ORDERED;
@ParametersAreNonnullByDefault
-public class PubErrQueue implements DistributionQueue {
+public class SubQueue implements DistributionQueue {
private static final String UNSUPPORTED_CLEAR_OPERATION = "Unsupported clear operation";
- private static final Logger LOG = LoggerFactory.getLogger(PubErrQueue.class);
- private final OffsetQueue<DistributionQueueItem> agentQueue;
+ @SuppressWarnings("unused")
+ private static final Logger LOG = LoggerFactory.getLogger(SubQueue.class);
- private final QueueEntryFactory entryFactory;
+ private final DistributionQueueItem headItem;
- private final OffsetQueue<Long> errorQueue;
+ private final PackageRetries packageRetries;
private final String queueName;
- public PubErrQueue(String queueName, OffsetQueue<DistributionQueueItem> agentQueue, OffsetQueue<Long> errorQueue) {
- this.queueName = requireNonNull(queueName);
- this.agentQueue = requireNonNull(agentQueue);
- this.errorQueue = requireNonNull(errorQueue);
- this.entryFactory = new QueueEntryFactory(queueName, queueItem -> 0);
+ private final QueueEntryFactory entryFactory;
+
+ public SubQueue(String queueName,
+ @Nullable
+ DistributionQueueItem headItem,
+ PackageRetries packageRetries) {
+ this.headItem = headItem;
+ this.queueName = Objects.requireNonNull(queueName);
+ this.packageRetries = Objects.requireNonNull(packageRetries);
+ this.entryFactory = new QueueEntryFactory(queueName, this::attempts);
}
@Nonnull
@@ -67,45 +77,37 @@ public class PubErrQueue implements DistributionQueue {
}
@Override
- public DistributionQueueEntry add(@Nonnull DistributionQueueItem distributionQueueItem) {
+ public DistributionQueueEntry add(DistributionQueueItem queueItem) {
throw new UnsupportedOperationException("Unsupported add operation");
}
@Override
+ @CheckForNull
public DistributionQueueEntry getHead() {
- Long refOffset = errorQueue.getHeadItem();
- if (refOffset != null) {
- DistributionQueueItem queueItem = agentQueue.getItem(refOffset);
- return entryFactory.create(queueItem);
- }
- return null;
+ return entryFactory.create(headItem);
}
@Nonnull
@Override
public Iterable<DistributionQueueEntry> getEntries(int skip, int limit) {
- List<DistributionQueueEntry> entries = new ArrayList<>();
- for (long refOffset : errorQueue.getHeadItems(skip, limit)) {
- DistributionQueueItem queueItem = agentQueue.getItem(refOffset);
- if (queueItem != null) {
- entries.add(entryFactory.create(queueItem));
- } else {
- LOG.warn("queueItem at offset {} not found", refOffset);
- }
+ final List<DistributionQueueEntry> entries;
+ if (skip == 0 && (limit == -1 || limit > 0) && headItem != null) {
+ entries = Collections.singletonList(entryFactory.create(headItem));
+ } else {
+ entries = Collections.emptyList();
}
- return entries;
+ return Collections.unmodifiableList(entries);
}
@Override
- public DistributionQueueEntry getEntry(@Nonnull String entryId) {
- DistributionQueueItem queueItem = agentQueue.getItem(EntryUtil.entryOffset(entryId));
- return (queueItem != null)
- ? entryFactory.create(queueItem)
+ public DistributionQueueEntry getEntry(String entryId) {
+ return (entryId.equals(EntryUtil.entryId(headItem)))
+ ? entryFactory.create(headItem)
: null;
}
@Override
- public DistributionQueueEntry remove(@Nonnull String entryId) {
+ public DistributionQueueEntry remove(String entryId) {
throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
}
@@ -124,13 +126,29 @@ public class PubErrQueue implements DistributionQueue {
@Nonnull
@Override
public DistributionQueueStatus getStatus() {
- return new DistributionQueueStatus(errorQueue.getSize(), DistributionQueueState.PASSIVE);
+ final DistributionQueueState queueState;
+ final int itemsCount;
+ DistributionQueueEntry headEntry = getHead();
+ if (headEntry != null) {
+ itemsCount = 1;
+ DistributionQueueItemState itemState = headEntry.getStatus().getItemState();
+ if (itemState == QUEUED) {
+ queueState = RUNNING;
+ } else {
+ queueState = BLOCKED;
+ }
+ } else {
+ itemsCount = 0;
+ queueState = IDLE;
+ }
+
+ return new DistributionQueueStatus(itemsCount, queueState);
}
- @Nonnull
@Override
+ @Nonnull
public DistributionQueueType getType() {
- return DistributionQueueType.ORDERED;
+ return ORDERED;
}
@Override
@@ -138,4 +156,9 @@ public class PubErrQueue implements DistributionQueue {
return false;
}
+ private int attempts(DistributionQueueItem queueItem) {
+ String entryId = EntryUtil.entryId(queueItem);
+ return packageRetries.get(entryId);
+ }
+
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/AgentState.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/AgentState.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/shared/AgentState.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/shared/AgentState.java
index 51d5466..858e8dc 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/AgentState.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/AgentState.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/DefaultDistributionLog.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/DefaultDistributionLog.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/shared/DefaultDistributionLog.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/shared/DefaultDistributionLog.java
index 80c3747..bb49514 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/DefaultDistributionLog.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/DefaultDistributionLog.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
similarity index 68%
rename from src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
index 4dd1167..f18c04c 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
import static java.lang.String.format;
@@ -48,6 +48,8 @@ public class DistributionMetricsService {
public static final String PUB_COMPONENT = BASE_COMPONENT + ".publisher";
+ public static final String SUB_COMPONENT = BASE_COMPONENT + ".subscriber";
+
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Reference
@@ -57,12 +59,30 @@ public class DistributionMetricsService {
private Timer cleanupPackageDuration;
+ private Histogram importedPackageSize;
+
private Histogram exportedPackageSize;
private Meter acceptedRequests;
private Meter droppedRequests;
+ private Counter itemsBufferSize;
+
+ private Timer removedPackageDuration;
+
+ private Timer removedFailedPackageDuration;
+
+ private Timer importedPackageDuration;
+
+ private Meter failedPackageImports;
+
+ private Timer sendStoredStatusDuration;
+
+ private Timer processQueueItemDuration;
+
+ private Timer packageDistributedDuration;
+
private Timer buildPackageDuration;
private Timer enqueuePackageDuration;
@@ -82,6 +102,16 @@ public class DistributionMetricsService {
buildPackageDuration = getTimer(getMetricName(PUB_COMPONENT, "build_package_duration"));
enqueuePackageDuration = getTimer(getMetricName(PUB_COMPONENT, "enqueue_package_duration"));
queueCacheFetchCount = getCounter(getMetricName(PUB_COMPONENT, "queue_cache_fetch_count"));
+
+ importedPackageSize = getHistogram(getMetricName(SUB_COMPONENT, "imported_package_size"));
+ itemsBufferSize = getCounter(getMetricName(SUB_COMPONENT, "items_buffer_size"));
+ importedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, "imported_package_duration"));
+ removedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, "removed_package_duration"));
+ removedFailedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, "removed_failed_package_duration"));
+ failedPackageImports = getMeter(getMetricName(SUB_COMPONENT, "failed_package_imports"));
+ sendStoredStatusDuration = getTimer(getMetricName(SUB_COMPONENT, "send_stored_status_duration"));
+ processQueueItemDuration = getTimer(getMetricName(SUB_COMPONENT, "process_queue_item_duration"));
+ packageDistributedDuration = getTimer(getMetricName(SUB_COMPONENT, "request_distributed_duration"));
}
/**
@@ -135,6 +165,15 @@ public class DistributionMetricsService {
}
/**
+ * Histogram of the imported content package size in Byte.
+ *
+ * @return a Sling Metrics histogram
+ */
+ public Histogram getImportedPackageSize() {
+ return importedPackageSize;
+ }
+
+ /**
* Histogram of the exported content package size in Bytes.
*
* @return a Sling Metrics histogram
@@ -162,6 +201,79 @@ public class DistributionMetricsService {
}
/**
+ * Counter of the package buffer size on the subscriber.
+ *
+ * @return a Sling Metrics counter
+ */
+ public Counter getItemsBufferSize() {
+ return itemsBufferSize;
+ }
+
+ /**
+ * Timer capturing the duration in ms of successful packages import operations.
+ *
+ * @return a Sling Metrics timer
+ */
+ public Timer getImportedPackageDuration() {
+ return importedPackageDuration;
+ }
+
+ /**
+ * Timer capturing the duration in ms of packages successfully removed from an editable subscriber.
+ *
+ * @return a Sling Metrics timer
+ */
+ public Timer getRemovedPackageDuration() {
+ return removedPackageDuration;
+ }
+
+ /**
+ * Timer capturing the duration in ms of packages successfully removed automatically from a subscriber supporting error queue.
+ *
+ * @return a Sling Metrics timer
+ */
+ public Timer getRemovedFailedPackageDuration() {
+ return removedFailedPackageDuration;
+ }
+
+ /**
+ * Meter of failures to import packages.
+ *
+ * @return a Sling Metrics meter
+ */
+ public Meter getFailedPackageImports() {
+ return failedPackageImports;
+ }
+
+ /**
+ * Timer capturing the duration in ms of sending a stored package status.
+ *
+ * @return a Sling Metric timer
+ */
+ public Timer getSendStoredStatusDuration() {
+ return sendStoredStatusDuration;
+ }
+
+ /**
+ * Timer capturing the duration in ms of processing a queue item.
+ *
+ * @return a Sling Metric timer
+ */
+ public Timer getProcessQueueItemDuration() {
+ return processQueueItemDuration;
+ }
+
+ /**
+ * Timer capturing the duration in ms of distributing a distribution package.
+ * The timer starts when the package is enqueued and stops when the package is successfully imported.
+ *
+ * @return a Sling Metric timer
+ */
+ public Timer getPackageDistributedDuration() {
+ return packageDistributedDuration;
+ }
+
+ /**
* Timer capturing the duration in ms of building a content package
*
* @return a Sling Metric timer
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/ExponentialBackOff.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/shared/ExponentialBackOff.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
index 48727a7..28f5aa8 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/ExponentialBackOff.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/JMXRegistration.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JMXRegistration.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/shared/JMXRegistration.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/shared/JMXRegistration.java
index a1e3924..e4f36c9 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/JMXRegistration.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JMXRegistration.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
import static java.lang.management.ManagementFactory.getPlatformMBeanServer;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableChecker.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableChecker.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
index 6cb5170..3582b36 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableChecker.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
@@ -16,14 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
import static java.util.Objects.requireNonNull;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.ExceptionEventSender;
import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService.GaugeService;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
import org.osgi.framework.BundleContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableServiceMarker.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableServiceMarker.java
similarity index 96%
rename from src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableServiceMarker.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableServiceMarker.java
index 9478372..287f3fa 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableServiceMarker.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableServiceMarker.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
import java.util.Objects;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/LimitPoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/LimitPoller.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/shared/LimitPoller.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/shared/LimitPoller.java
index 18cf976..2bb30b5 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/LimitPoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/LimitPoller.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
import static org.apache.sling.distribution.journal.HandlerAdapter.create;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/PackageBrowser.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/shared/PackageBrowser.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
index 72eb177..b9d1d2d 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/PackageBrowser.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
import static java.util.Collections.singletonMap;
import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/PackageViewerPlugin.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
similarity index 99%
rename from src/main/java/org/apache/sling/distribution/journal/shared/PackageViewerPlugin.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
index 8ac9ba5..30f8836 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/PackageViewerPlugin.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
import java.io.IOException;
import java.io.PrintWriter;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponse.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponse.java
similarity index 96%
rename from src/main/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponse.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponse.java
index eb0510c..e49700e 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponse.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponse.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/Topics.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/Topics.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/shared/Topics.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/shared/Topics.java
index c9ce5e2..1f4d44a 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/Topics.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/Topics.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
index beeb62d..7ecd03b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
@@ -32,7 +32,6 @@ import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
-import org.apache.sling.distribution.journal.service.subscriber.BookKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
similarity index 82%
rename from src/main/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeper.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
index 001fafc..53a7245 100644
--- a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeper.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.service.subscriber;
+package org.apache.sling.distribution.journal.impl.subscriber;
import static java.lang.String.format;
import static java.lang.System.currentTimeMillis;
@@ -31,7 +31,6 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@@ -42,13 +41,14 @@ import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.commons.metrics.Timer;
-import org.apache.sling.distribution.agent.DistributionAgentState;
import org.apache.sling.distribution.common.DistributionException;
-import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
+import org.apache.sling.distribution.journal.impl.queue.impl.PackageRetries;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
-import org.apache.sling.distribution.journal.service.subscriber.SubscriberMetrics.GaugeService;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
@@ -85,11 +85,9 @@ public class BookKeeper implements Closeable {
private final Logger log = LoggerFactory.getLogger(this.getClass());
private final ResourceResolverFactory resolverFactory;
- private final SubscriberMetrics subscriberMetrics;
+ private final DistributionMetricsService distributionMetricsService;
private final PackageHandler packageHandler;
- public final SubscriberIdle subscriberIdle;
private final EventAdmin eventAdmin;
-
private final Consumer<PackageStatusMessage> sender;
private final boolean editable;
private final int maxRetries;
@@ -103,10 +101,9 @@ public class BookKeeper implements Closeable {
private GaugeService<Integer> retriesGauge;
private int skippedCounter = 0;
- BookKeeper(ResourceResolverFactory resolverFactory,
- SubscriberMetrics subscriberMetrics,
+ public BookKeeper(ResourceResolverFactory resolverFactory,
+ DistributionMetricsService distributionMetricsService,
PackageHandler packageHandler,
- SubscriberIdle subscriberIdle,
EventAdmin eventAdmin,
Consumer<PackageStatusMessage> sender,
String subAgentName,
@@ -114,12 +111,11 @@ public class BookKeeper implements Closeable {
boolean editable,
int maxRetries) {
this.packageHandler = packageHandler;
- this.subscriberIdle = subscriberIdle;
this.eventAdmin = eventAdmin;
- String nameRetries = SubscriberMetrics.SUB_COMPONENT + ".current_retries;sub_name=" + subAgentName;
- this.subscriberMetrics = subscriberMetrics;
- this.retriesGauge = subscriberMetrics.createGauge(nameRetries, "Retries of current package", packageRetries::getSum);
+ String nameRetries = DistributionMetricsService.SUB_COMPONENT + ".current_retries;sub_name=" + subAgentName;
+ this.retriesGauge = distributionMetricsService.createGauge(nameRetries, "Retries of current package", packageRetries::getSum);
this.resolverFactory = resolverFactory;
+ this.distributionMetricsService = distributionMetricsService;
this.sender = sender;
this.subAgentName = subAgentName;
this.subSlingId = subSlingId;
@@ -132,31 +128,6 @@ public class BookKeeper implements Closeable {
this.processedOffsets = new LocalStore(resolverFactory, "packages", subAgentName);
}
- public void processPackage(MessageInfo info, PackageMessage pkgMsg, BiPredicate<Long, PackageMessage> shouldRemove) throws Exception {
- try {
- sendStoredStatus();
- long offset = info.getOffset();
- boolean remove = shouldRemove.test(offset, pkgMsg);
- subscriberIdle.busy();
- if (remove) {
- removePackage(pkgMsg, offset);
- } else {
- long createdTime = info.getCreateTime();
- importPackage(pkgMsg, offset, createdTime);
- }
- } finally {
- subscriberIdle.idle();
- sendStoredStatus();
- }
- }
-
- public DistributionAgentState getState() {
- if (subscriberIdle.isIdle()) {
- return DistributionAgentState.IDLE;
- }
- return packageRetries.getSum() > 0 ? DistributionAgentState.BLOCKED : DistributionAgentState.RUNNING;
- }
-
/**
* We aim at processing the packages exactly once. Processing the packages
* exactly once is possible with the following conditions
@@ -173,11 +144,11 @@ public class BookKeeper implements Closeable {
* failing. For those packages importers, we aim at processing packages at least
* once, thanks to the order in which the content updates are applied.
*/
- private void importPackage(PackageMessage pkgMsg, long offset, long createdTime) throws DistributionException {
+ public void importPackage(PackageMessage pkgMsg, long offset, long createdTime) throws DistributionException {
log.info("Importing distribution package {} of type {} at offset {}",
pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
addPackageMDC(pkgMsg);
- try (Timer.Context context = subscriberMetrics.getImportedPackageDuration().time();
+ try (Timer.Context context = distributionMetricsService.getImportedPackageDuration().time();
ResourceResolver importerResolver = getServiceResolver(SUBSERVICE_IMPORTER)) {
packageHandler.apply(importerResolver, pkgMsg);
if (editable) {
@@ -185,10 +156,10 @@ public class BookKeeper implements Closeable {
}
storeOffset(importerResolver, offset);
importerResolver.commit();
- subscriberMetrics.getImportedPackageSize().update(pkgMsg.getPkgLength());
- subscriberMetrics.getPackageDistributedDuration().update((currentTimeMillis() - createdTime), TimeUnit.MILLISECONDS);
+ distributionMetricsService.getImportedPackageSize().update(pkgMsg.getPkgLength());
+ distributionMetricsService.getPackageDistributedDuration().update((currentTimeMillis() - createdTime), TimeUnit.MILLISECONDS);
packageRetries.clear(pkgMsg.getPubAgentName());
- Event event = ImportedEventFactory.create(pkgMsg, subAgentName);
+ Event event = DistributionEvent.eventImporterImported(pkgMsg, subAgentName);
eventAdmin.postEvent(event);
} catch (LoginException | IOException | RuntimeException e) {
failure(pkgMsg, offset, e);
@@ -224,7 +195,7 @@ public class BookKeeper implements Closeable {
* @throws DistributionException if the package should be retried
*/
private void failure(PackageMessage pkgMsg, long offset, Exception e) throws DistributionException {
- subscriberMetrics.getFailedPackageImports().mark();
+ distributionMetricsService.getFailedPackageImports().mark();
String pubAgentName = pkgMsg.getPubAgentName();
int retries = packageRetries.get(pubAgentName);
@@ -239,10 +210,10 @@ public class BookKeeper implements Closeable {
}
}
- private void removePackage(PackageMessage pkgMsg, long offset) throws LoginException, PersistenceException {
+ public void removePackage(PackageMessage pkgMsg, long offset) throws LoginException, PersistenceException {
log.info("Removing distribution package {} of type {} at offset {}",
pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
- Timer.Context context = subscriberMetrics.getRemovedPackageDuration().time();
+ Timer.Context context = distributionMetricsService.getRemovedPackageDuration().time();
try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
if (editable) {
storeStatus(resolver, new PackageStatus(REMOVED, offset, pkgMsg.getPubAgentName()));
@@ -254,19 +225,17 @@ public class BookKeeper implements Closeable {
context.stop();
}
- public void skipPackage(long offset) {
+ public void skipPackage(long offset) throws LoginException, PersistenceException {
log.info("Skipping package at offset {}", offset);
if (shouldCommitSkipped()) {
try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
storeOffset(resolver, offset);
resolver.commit();
- } catch (Exception e) {
- log.warn("");
}
}
}
- private synchronized boolean shouldCommitSkipped() {
+ public synchronized boolean shouldCommitSkipped() {
skippedCounter ++;
if (skippedCounter > COMMIT_AFTER_NUM_SKIPPED) {
skippedCounter = 1;
@@ -280,8 +249,8 @@ public class BookKeeper implements Closeable {
* Send status stored in a previous run if exists
* @throws InterruptedException
*/
- private void sendStoredStatus() throws InterruptedException {
- try (Timer.Context context = subscriberMetrics.getSendStoredStatusDuration().time()) {
+ public void sendStoredStatus() throws InterruptedException {
+ try (Timer.Context context = distributionMetricsService.getSendStoredStatusDuration().time()) {
PackageStatus status = new PackageStatus(statusStore.load());
boolean sent = status.sent;
int retry = 0;
@@ -317,7 +286,7 @@ public class BookKeeper implements Closeable {
log.info("Sent status message {}", pkgStatMsg);
}
- private void markStatusSent() {
+ public void markStatusSent() {
try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
statusStore.store(resolver, "sent", true);
resolver.commit();
@@ -334,6 +303,10 @@ public class BookKeeper implements Closeable {
return packageRetries.get(pubAgentName);
}
+ public PackageRetries getPackageRetries() {
+ return packageRetries;
+ }
+
@Override
public void close() throws IOException {
IOUtils.closeQuietly(retriesGauge);
@@ -342,7 +315,7 @@ public class BookKeeper implements Closeable {
private void removeFailedPackage(PackageMessage pkgMsg, long offset) throws DistributionException {
log.info("Removing failed distribution package {} of type {} at offset {}",
pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
- Timer.Context context = subscriberMetrics.getRemovedFailedPackageDuration().time();
+ Timer.Context context = distributionMetricsService.getRemovedFailedPackageDuration().time();
try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
storeStatus(resolver, new PackageStatus(REMOVED_FAILED, offset, pkgMsg.getPubAgentName()));
storeOffset(resolver, offset);
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
index 3a1df97..f9d047b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
@@ -27,8 +27,8 @@ import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
-import org.apache.sling.distribution.journal.shared.Topics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractor.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractor.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractor.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractor.java
index c626550..998f929 100644
--- a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractor.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractor.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.service.subscriber;
+package org.apache.sling.distribution.journal.impl.subscriber;
import static java.util.Objects.requireNonNull;
@@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory;
* Each distribution package is inspected for possible content packages in /etc/packages.
* Such content packages are installed via the Packaging service.
*/
-class ContentPackageExtractor {
+public class ContentPackageExtractor {
private static final String PACKAGE_BASE_PATH = "/etc/packages/";
private final Logger log = LoggerFactory.getLogger(this.getClass());
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index a86cbc5..74fc5e2 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -24,14 +24,21 @@ import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toSet;
import static org.apache.sling.distribution.journal.HandlerAdapter.create;
import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
+import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.PACKAGE_MSG;
+import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_OFFSET;
+import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TIMESTAMP;
import java.io.Closeable;
import java.util.Collections;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
@@ -39,25 +46,37 @@ import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.jackrabbit.vault.packaging.Packaging;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.commons.metrics.Timer;
import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.distribution.DistributionRequest;
+import org.apache.sling.distribution.DistributionRequestState;
+import org.apache.sling.distribution.DistributionRequestType;
+import org.apache.sling.distribution.DistributionResponse;
import org.apache.sling.distribution.agent.DistributionAgentState;
import org.apache.sling.distribution.agent.spi.DistributionAgent;
-import org.apache.sling.distribution.journal.FullMessage;
+import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.journal.JournalAvailable;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.impl.precondition.Precondition;
-import org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
+import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
+import org.apache.sling.distribution.journal.impl.queue.impl.SubQueue;
+import org.apache.sling.distribution.journal.impl.shared.AgentState;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import org.apache.sling.distribution.journal.service.subscriber.BookKeeper;
-import org.apache.sling.distribution.journal.service.subscriber.BookKeeperFactory;
-import org.apache.sling.distribution.journal.service.subscriber.PackageHandling;
-import org.apache.sling.distribution.journal.service.subscriber.SubscriberMetrics;
-import org.apache.sling.distribution.journal.shared.Topics;
+import org.apache.sling.distribution.log.spi.DistributionLog;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
+import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.settings.SlingSettingsService;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
@@ -65,6 +84,7 @@ import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.event.EventAdmin;
import org.osgi.service.metatype.annotations.Designate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,18 +95,28 @@ import com.google.protobuf.GeneratedMessage;
* A Subscriber SCD agent which consumes messages produced by a
* {@code DistributionPublisher} agent.
*/
-@Component(service = DistributionSubscriber.class, immediate = true,
- property = { "announceDelay=10000" },
- configurationPid = "org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriberFactory")
+@Component(service = {}, immediate = true, property = {
+ "announceDelay=10000" }, configurationPid = "org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriberFactory")
@Designate(ocd = SubscriberConfiguration.class, factory = true)
@ParametersAreNonnullByDefault
-public class DistributionSubscriber {
+public class DistributionSubscriber implements DistributionAgent {
private static final int PRECONDITION_TIMEOUT = 60;
static int RETRY_DELAY = 5000;
static int QUEUE_FETCH_DELAY = 1000;
private static final Logger LOG = LoggerFactory.getLogger(DistributionSubscriber.class);
+ private static final Set<DistributionRequestType> SUPPORTED_REQ_TYPES = Collections.emptySet();
+
+ @Reference(name = "packageBuilder")
+ private DistributionPackageBuilder packageBuilder;
+
+ @Reference
+ private SlingSettingsService slingSettings;
+
+ @Reference
+ private ResourceResolverFactory resolverFactory;
+
@Reference
private MessagingProvider messagingProvider;
@@ -94,34 +124,33 @@ public class DistributionSubscriber {
private Topics topics;
@Reference
+ private EventAdmin eventAdmin;
+
+ @Reference
private JournalAvailable journalAvailable;
@Reference(name = "precondition")
private Precondition precondition;
- @Reference(name = "packageBuilder")
- private DistributionPackageBuilder packageBuilder;
+ @Reference
+ private DistributionMetricsService distributionMetricsService;
@Reference
- private SubscriberMetrics subscriberMetrics;
+ private Packaging packaging;
- @Reference
- private SlingSettingsService slingSettings;
+ SubscriberIdle subscriberIdle;
- @Reference
- BookKeeperFactory bookKeeperFactory;
-
private ServiceRegistration<DistributionAgent> componentReg;
private Closeable packagePoller;
private CommandPoller commandPoller;
- BookKeeper bookKeeper;
-
+ private BookKeeper bookKeeper;
+
// Use a bounded internal buffer to allow reading further packages while working
// on one at a time
- private final BlockingQueue<FullMessage<PackageMessage>> queueItemsBuffer = new LinkedBlockingQueue<>(8);
+ private final BlockingQueue<DistributionQueueItem> queueItemsBuffer = new LinkedBlockingQueue<>(8);
private Set<String> queueNames = Collections.emptySet();
@@ -129,30 +158,39 @@ public class DistributionSubscriber {
private String subAgentName;
+ private String pkgType;
+
private volatile boolean running = true;
private volatile Thread queueProcessor;
@Activate
public void activate(SubscriberConfiguration config, BundleContext context, Map<String, Object> properties) {
+ String subSlingId = requireNonNull(slingSettings.getSlingId());
subAgentName = requireNonNull(config.name());
requireNonNull(config);
requireNonNull(context);
-
requireNonNull(packageBuilder);
- requireNonNull(subscriberMetrics);
requireNonNull(slingSettings);
+ requireNonNull(resolverFactory);
requireNonNull(messagingProvider);
requireNonNull(topics);
+ requireNonNull(eventAdmin);
requireNonNull(precondition);
- requireNonNull(bookKeeperFactory);
+ // Unofficial config (currently just for test)
+ Integer idleMillies = (Integer) properties.getOrDefault("idleMillies", SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS);
+ subscriberIdle = new SubscriberIdle(context, idleMillies);
+
queueNames = getNotEmpty(config.agentNames());
- String subSlingId = requireNonNull(slingSettings.getSlingId());
int maxRetries = config.maxRetries();
boolean editable = config.editable();
- PackageHandling packageHandling = config.packageHandling();
- bookKeeper = bookKeeperFactory.create(packageBuilder, subAgentName, subSlingId, maxRetries, editable, packageHandling, sender(topics.getStatusTopic()));
+
+ ContentPackageExtractor extractor = new ContentPackageExtractor(packaging, config.packageHandling());
+ PackageHandler packageHandler = new PackageHandler(packageBuilder, extractor);
+ bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packageHandler, eventAdmin,
+ sender(topics.getStatusTopic()), subAgentName, subSlingId, editable, maxRetries);
+
long startOffset = bookKeeper.loadOffset() + 1;
String assign = messagingProvider.assignTo(startOffset);
@@ -168,11 +206,14 @@ public class DistributionSubscriber {
announcer = new Announcer(subSlingId, subAgentName, queueNames, sender(topics.getDiscoveryTopic()), bookKeeper,
maxRetries, config.editable(), announceDelay);
+ pkgType = requireNonNull(packageBuilder.getType());
boolean errorQueueEnabled = (maxRetries >= 0);
String msg = format(
- "Started Subscriber agent %s at offset %s, subscribed to agent names %s editable %s maxRetries %s errorQueueEnabled %s",
- subAgentName, startOffset, queueNames, config.editable(), maxRetries, errorQueueEnabled);
+ "Started Subscriber agent %s at offset %s, subscribed to agent names %s with package builder %s editable %s maxRetries %s errorQueueEnabled %s",
+ subAgentName, startOffset, queueNames, pkgType, config.editable(), maxRetries, errorQueueEnabled);
LOG.info(msg);
+ Dictionary<String, Object> props = createServiceProps(config);
+ componentReg = context.registerService(DistributionAgent.class, this, props);
}
private <T extends GeneratedMessage> Consumer<T> sender(String topic) {
@@ -184,10 +225,24 @@ public class DistributionSubscriber {
return asList(agentNames).stream().filter(StringUtils::isNotBlank).collect(toSet());
}
+ private Dictionary<String, Object> createServiceProps(SubscriberConfiguration config) {
+ Dictionary<String, Object> props = new Hashtable<>();
+ props.put("name", config.name());
+ props.put("title", config.name());
+ props.put("details", config.name());
+ props.put("agentNames", config.agentNames());
+ props.put("editable", config.editable());
+ props.put("maxRetries", config.maxRetries());
+ props.put("packageBuilder.target", config.packageBuilder_target());
+ props.put("precondition.target", config.precondition_target());
+ props.put("webconsole.configurationFactory.nameHint", config.webconsole_configurationFactory_nameHint());
+ return props;
+ }
+
@Deactivate
public void deactivate() {
componentReg.unregister();
- IOUtils.closeQuietly(announcer, bookKeeper,
+ IOUtils.closeQuietly(subscriberIdle, announcer, bookKeeper,
packagePoller, commandPoller);
running = false;
Thread interrupter = this.queueProcessor;
@@ -195,22 +250,71 @@ public class DistributionSubscriber {
interrupter.interrupt();
}
String msg = String.format(
- "Stopped Subscriber agent %s, subscribed to Publisher agent names %s",
- subAgentName, queueNames);
+ "Stopped Subscriber agent %s, subscribed to Publisher agent names %s with package builder %s",
+ subAgentName, queueNames, pkgType);
LOG.info(msg);
}
-
+
+ @Nonnull
+ @Override
+ public Iterable<String> getQueueNames() {
+ return queueNames;
+ }
+
+ @Override
+ public DistributionQueue getQueue(@Nonnull String queueName) {
+ DistributionQueueItem head = queueItemsBuffer.stream()
+ .filter(item -> isIn(queueName, item))
+ .findFirst()
+ .orElse(null);
+ return new SubQueue(queueName, head, bookKeeper.getPackageRetries());
+ }
+
+ private boolean isIn(String queueName, DistributionQueueItem queueItem) {
+ PackageMessage packageMsg = queueItem.get(QueueItemFactory.PACKAGE_MSG, PackageMessage.class);
+ return queueName.equals(packageMsg.getPubAgentName());
+ }
+
+ @Nonnull
+ @Override
+ public DistributionLog getLog() {
+ return this::emptyDistributionLog;
+ }
+
+ private List<String> emptyDistributionLog() {
+ return Collections.emptyList();
+ }
+
@Nonnull
+ @Override
public DistributionAgentState getState() {
- return bookKeeper.getState();
+ return AgentState.getState(this);
}
-
+
+ @Nonnull
+ @Override
+ public DistributionResponse execute(ResourceResolver resourceResolver, DistributionRequest request) {
+ return executeUnsupported(request);
+ }
+
+ @Nonnull
+ private DistributionResponse executeUnsupported(DistributionRequest request) {
+ String msg = format("Request type %s is not supported by this agent, expected one of %s",
+ request.getRequestType(), SUPPORTED_REQ_TYPES);
+ LOG.info(msg);
+ return new SimpleDistributionResponse(DistributionRequestState.DROPPED, msg);
+ }
+
private void handlePackageMessage(MessageInfo info, PackageMessage message) {
if (shouldEnqueue(info, message)) {
- FullMessage<PackageMessage> queueItem = new FullMessage<PackageMessage>(info, message);
+ DistributionQueueItem queueItem = QueueItemFactory.fromPackage(info, message, true);
enqueue(queueItem);
} else {
- bookKeeper.skipPackage(info.getOffset());
+ try {
+ bookKeeper.skipPackage(info.getOffset());
+ } catch (PersistenceException | LoginException e) {
+ LOG.info("Error marking message at offset {} as skipped", info.getOffset(), e);
+ }
}
}
@@ -219,19 +323,23 @@ public class DistributionSubscriber {
LOG.info("Skipping package for Publisher agent {} at offset {} (not subscribed)", message.getPubAgentName(), info.getOffset());
return false;
}
+ if (!pkgType.equals(message.getPkgType())) {
+ LOG.warn("Skipping package with type {} at offset {}", message.getPkgType(), info.getOffset());
+ return false;
+ }
return true;
}
-
+
/**
* We block here if the buffer is full in order to limit the number of binary
* packages fetched in memory. Note that each queued item contains the binary
* package to be imported.
*/
- private void enqueue(FullMessage<PackageMessage> queueItem) {
+ private void enqueue(DistributionQueueItem queueItem) {
try {
while (running) {
if (queueItemsBuffer.offer(queueItem, 1000, TimeUnit.MILLISECONDS)) {
- subscriberMetrics.getItemsBufferSize().increment();
+ distributionMetricsService.getItemsBufferSize().increment();
return;
}
}
@@ -241,7 +349,7 @@ public class DistributionSubscriber {
throw new RuntimeException();
}
}
-
+
private void processQueue() {
LOG.info("Started Queue processor");
while (!Thread.interrupted()) {
@@ -253,27 +361,37 @@ public class DistributionSubscriber {
}
LOG.info("Stopped Queue processor");
}
-
+
private void fetchAndProcessQueueItem() throws InterruptedException {
try {
- FullMessage<PackageMessage> item = blockingPeekQueueItem();
- try (Timer.Context context = subscriberMetrics.getProcessQueueItemDuration().time()) {
- bookKeeper.processPackage(item.getInfo(), item.getMessage(), this::shouldRemove);
+
+ bookKeeper.sendStoredStatus();
+ DistributionQueueItem item = blockingPeekQueueItem();
+
+ try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) {
+ processQueueItem(item);
+ } finally {
+ subscriberIdle.idle();
}
- queueItemsBuffer.remove();
- } catch (PreConditionTimeoutException e) {
+
+ } catch (TimeoutException e) {
+ /**
+ * Precondition timed out. We only log this on info level as it is no error
+ */
LOG.info(e.getMessage());
- retryDelay();
+ Thread.sleep(RETRY_DELAY);
+ } catch (InterruptedException e) {
+ throw e;
} catch (Exception e) {
// Catch all to prevent processing from stopping
- LOG.warn("Error processing queue item", e);
- retryDelay();
+ LOG.error("Error processing queue item", e);
+ Thread.sleep(RETRY_DELAY);
}
}
-
- private FullMessage<PackageMessage> blockingPeekQueueItem() throws InterruptedException {
+
+ private DistributionQueueItem blockingPeekQueueItem() throws InterruptedException {
while (true) {
- FullMessage<PackageMessage> queueItem = queueItemsBuffer.peek();
+ DistributionQueueItem queueItem = queueItemsBuffer.peek();
if (queueItem != null) {
return queueItem;
} else {
@@ -282,38 +400,23 @@ public class DistributionSubscriber {
}
}
- private void retryDelay() {
- try {
- Thread.sleep(RETRY_DELAY);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- private boolean shouldRemove(long offset, PackageMessage message) {
- if (commandPoller.isCleared(offset)) {
- return true;
+ private void processQueueItem(DistributionQueueItem queueItem) throws PersistenceException, LoginException, DistributionException, InterruptedException, TimeoutException {
+ long offset = queueItem.get(RECORD_OFFSET, Long.class);
+ PackageMessage pkgMsg = queueItem.get(PACKAGE_MSG, PackageMessage.class);
+ boolean skip = shouldSkip(offset);
+ subscriberIdle.busy();
+ if (skip) {
+ bookKeeper.removePackage(pkgMsg, offset);
+ } else {
+ long createdTime = queueItem.get(RECORD_TIMESTAMP, Long.class);
+ bookKeeper.importPackage(pkgMsg, offset, createdTime);
}
- return waitPrecondition(offset);
+ queueItemsBuffer.remove();
+ distributionMetricsService.getItemsBufferSize().decrement();
}
- private boolean waitPrecondition(long offset) {
- Decision decision = Decision.WAIT;
- long endTime = System.currentTimeMillis() + PRECONDITION_TIMEOUT * 1000;
- while (decision == Decision.WAIT && System.currentTimeMillis() < endTime) {
- decision = precondition.canProcess(subAgentName, offset);
- if (decision == Decision.WAIT) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return false;
- }
- } else {
- return decision == Decision.SKIP ? true : false;
- }
- }
- throw new PreConditionTimeoutException("Timeout waiting for package offset " + offset + " on status topic.");
+ private boolean shouldSkip(long offset) throws InterruptedException, TimeoutException {
+ return commandPoller.isCleared(offset) || !precondition.canProcess(subAgentName, offset, PRECONDITION_TIMEOUT);
}
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/LocalStore.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/service/subscriber/LocalStore.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java
index 0018641..c619d79 100644
--- a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/LocalStore.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.service.subscriber;
+package org.apache.sling.distribution.journal.impl.subscriber;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.ModifiableValueMap;
@@ -44,7 +44,7 @@ import static java.util.Objects.requireNonNull;
import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
@ParametersAreNonnullByDefault
-class LocalStore {
+public class LocalStore {
private static final String ROOT_PATH = "/var/sling/distribution/journal/stores";
diff --git a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandler.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
similarity index 64%
rename from src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandler.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
index ef832f3..a7148c4 100644
--- a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandler.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
@@ -16,32 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.service.subscriber;
+package org.apache.sling.distribution.journal.impl.subscriber;
import static java.lang.String.format;
-import java.io.ByteArrayInputStream;
import java.io.InputStream;
-import java.util.Objects;
-
-import javax.annotation.Nonnull;
-import javax.jcr.Binary;
-import javax.jcr.RepositoryException;
-import javax.jcr.Session;
-import javax.jcr.ValueFactory;
import org.apache.commons.io.IOUtils;
-import org.apache.jackrabbit.commons.jackrabbit.SimpleReferenceBinary;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.impl.shared.PackageBrowser;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class PackageHandler {
+public class PackageHandler {
private static final Logger LOG = LoggerFactory.getLogger(PackageHandler.class);
private DistributionPackageBuilder packageBuilder;
@@ -74,38 +66,13 @@ class PackageHandler {
LOG.info("Importing paths {}",pkgMsg.getPathsList());
InputStream pkgStream = null;
try {
- pkgStream = pkgStream(resolver, pkgMsg);
- if (canHandlePackage(pkgMsg)) {
- packageBuilder.installPackage(resolver, pkgStream);
- extractor.handle(resolver, pkgMsg.getPathsList());
- }
+ pkgStream = PackageBrowser.pkgStream(resolver, pkgMsg);
+ packageBuilder.installPackage(resolver, pkgStream);
+ extractor.handle(resolver, pkgMsg.getPathsList());
} finally {
IOUtils.closeQuietly(pkgStream);
}
- }
- private boolean canHandlePackage(PackageMessage pkgMsg) {
- return Objects.equals(packageBuilder.getType(), pkgMsg.getPkgType());
- }
-
- @Nonnull
- public static InputStream pkgStream(ResourceResolver resolver, PackageMessage pkgMsg) throws DistributionException {
- if (pkgMsg.hasPkgBinary()) {
- return new ByteArrayInputStream(pkgMsg.getPkgBinary().toByteArray());
- } else {
- String pkgBinRef = pkgMsg.getPkgBinaryRef();
- try {
- Session session = resolver.adaptTo(Session.class);
- if (session == null) {
- throw new DistributionException("Unable to get Oak session");
- }
- ValueFactory factory = session.getValueFactory();
- Binary binary = factory.createValue(new SimpleReferenceBinary(pkgBinRef)).getBinary();
- return binary.getStream();
- } catch (RepositoryException e) {
- throw new DistributionException(e.getMessage(), e);
- }
- }
}
private void installDeletePackage(ResourceResolver resolver, PackageMessage pkgMsg)
diff --git a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandling.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandling.java
similarity index 92%
rename from src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandling.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandling.java
index a1130fc..55dc55e 100644
--- a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandling.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandling.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.service.subscriber;
+package org.apache.sling.distribution.journal.impl.subscriber;
public enum PackageHandling {
Off, Extract, Install
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PreConditionTimeoutException.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PreConditionTimeoutException.java
deleted file mode 100644
index 684ab9d..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PreConditionTimeoutException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.impl.subscriber;
-
-public class PreConditionTimeoutException extends RuntimeException {
- public PreConditionTimeoutException(String msg) {
- super(msg);
- }
-
- private static final long serialVersionUID = 6286011641627241560L;
-
-}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
index 625a62b..937c30a 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
@@ -18,7 +18,6 @@
*/
package org.apache.sling.distribution.journal.impl.subscriber;
-import org.apache.sling.distribution.journal.service.subscriber.PackageHandling;
import org.osgi.service.metatype.annotations.AttributeDefinition;
import org.osgi.service.metatype.annotations.ObjectClassDefinition;
diff --git a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdle.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
similarity index 93%
rename from src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdle.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
index e72378d..a05b80c 100644
--- a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdle.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.service.subscriber;
+package org.apache.sling.distribution.journal.impl.subscriber;
import java.io.Closeable;
import java.util.Hashtable;
@@ -42,7 +42,6 @@ public class SubscriberIdle implements SystemReadyCheck, Closeable {
private final int idleMillis;
private final AtomicBoolean isReady = new AtomicBoolean();
- private final AtomicBoolean idle = new AtomicBoolean();
private final ScheduledExecutorService executor;
private ScheduledFuture<?> schedule;
@@ -70,23 +69,17 @@ public class SubscriberIdle implements SystemReadyCheck, Closeable {
* Called when processing of a message starts
*/
public synchronized void busy() {
- idle.set(false);
cancelSchedule();
}
public boolean isReady() {
return isReady.get();
}
-
- public boolean isIdle() {
- return idle.get();
- }
/**
* Called when processing of a message has finished
*/
public synchronized void idle() {
- idle.set(true);
if (!isReady.get()) {
cancelSchedule();
schedule = executor.schedule(this::ready, idleMillis, TimeUnit.MILLISECONDS);
diff --git a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeperFactory.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeperFactory.java
deleted file mode 100644
index b0cdbcb..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeperFactory.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.service.subscriber;
-
-import static java.util.Objects.requireNonNull;
-
-import java.util.Map;
-import java.util.function.Consumer;
-
-import org.apache.jackrabbit.vault.packaging.Packaging;
-import org.apache.sling.api.resource.ResourceResolverFactory;
-import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
-import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
-import org.osgi.framework.BundleContext;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.event.EventAdmin;
-
-@Component(service = BookKeeperFactory.class, immediate = true)
-public class BookKeeperFactory {
-
-
-
- @Reference
- private ResourceResolverFactory resolverFactory;
-
- @Reference
- private EventAdmin eventAdmin;
-
- @Reference
- private SubscriberMetrics subscriberMetrics;
-
- @Reference
- private Packaging packaging;
-
- private BundleContext context;
-
- private Integer idleMillies;
-
- public BookKeeperFactory() {
- }
-
- public BookKeeperFactory(ResourceResolverFactory resolverFactory,
- EventAdmin eventAdmin, SubscriberMetrics subscriberMetrics, Packaging packaging) {
- this.resolverFactory = resolverFactory;
- this.eventAdmin = eventAdmin;
- this.subscriberMetrics = subscriberMetrics;
- this.packaging = packaging;
- }
-
- public void activate(BundleContext context, Map<String, Object> properties) {
- this.context = context;
- this.idleMillies = (Integer) properties.getOrDefault("idleMillies", SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS);
- requireNonNull(resolverFactory);
- requireNonNull(eventAdmin);
- requireNonNull(subscriberMetrics);
- }
-
- public BookKeeper create(
- DistributionPackageBuilder packageBuilder,
- String subAgentName,
- String subSlingId,
- int maxRetries,
- boolean editable,
- PackageHandling packageHandling,
- Consumer<PackageStatusMessage> sender
- ) {
- ContentPackageExtractor extractor = new ContentPackageExtractor(packaging, packageHandling);
- PackageHandler packageHandler = new PackageHandler(packageBuilder, extractor);
- SubscriberIdle subscriberIdle = new SubscriberIdle(context, this.idleMillies);
- return new BookKeeper(resolverFactory, subscriberMetrics, packageHandler, subscriberIdle, eventAdmin,
- sender, subAgentName, subSlingId, editable, maxRetries);
- }
-}
diff --git a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/ImportedEventFactory.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/ImportedEventFactory.java
deleted file mode 100644
index 37a4734..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/ImportedEventFactory.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.service.subscriber;
-
-import static org.apache.sling.distribution.event.DistributionEventProperties.DISTRIBUTION_COMPONENT_KIND;
-import static org.apache.sling.distribution.event.DistributionEventProperties.DISTRIBUTION_COMPONENT_NAME;
-import static org.apache.sling.distribution.event.DistributionEventProperties.DISTRIBUTION_PATHS;
-import static org.apache.sling.distribution.event.DistributionEventProperties.DISTRIBUTION_TYPE;
-import static org.apache.sling.distribution.event.DistributionEventTopics.IMPORTER_PACKAGE_IMPORTED;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.annotation.ParametersAreNonnullByDefault;
-
-import org.apache.sling.distribution.journal.messages.Messages;
-import org.osgi.service.event.Event;
-
-@ParametersAreNonnullByDefault
-class ImportedEventFactory {
-
- public static final String PACKAGE_ID = "distribution.package.id";
- private static final String KIND_IMPORTER = "importer";
-
- private ImportedEventFactory() {
- }
-
- public static Event create(Messages.PackageMessage pkgMsg, String agentName) {
- String[] pathsList = pkgMsg.getPathsList().toArray(new String[0]);
- Map<String, Object> props = new HashMap<>();
- props.put(DISTRIBUTION_COMPONENT_KIND, KIND_IMPORTER);
- props.put(DISTRIBUTION_COMPONENT_NAME, agentName);
- props.put(DISTRIBUTION_TYPE, pkgMsg.getReqType().name());
- props.put(DISTRIBUTION_PATHS, pathsList);
- props.put(PACKAGE_ID, pkgMsg.getPkgId());
- return new Event(IMPORTER_PACKAGE_IMPORTED, props);
- }
-
-}
diff --git a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/NoopMetric.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/NoopMetric.java
deleted file mode 100644
index 375a792..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/NoopMetric.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.service.subscriber;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.sling.commons.metrics.Counter;
-import org.apache.sling.commons.metrics.Histogram;
-import org.apache.sling.commons.metrics.Meter;
-import org.apache.sling.commons.metrics.Timer;
-
-enum NoopMetric implements Counter, Histogram, Timer, Meter{
- INSTANCE;
-
- @Override
- public long getCount() {
- return 0;
- }
-
- @Override
- public void increment() {
-
- }
-
- @Override
- public void decrement() {
-
- }
-
- @Override
- public void increment(long n) {
-
- }
-
- @Override
- public void decrement(long n) {
-
- }
-
- @Override
- public void mark() {
-
- }
-
- @Override
- public void mark(long n) {
-
- }
-
- @Override
- public void update(long duration, TimeUnit unit) {
-
- }
-
- @Override
- public Context time() {
- return NoopContext.INSTANCE;
- }
-
- @Override
- public void update(long value) {
-
- }
-
- @Override
- public <AdapterType> AdapterType adaptTo(Class<AdapterType> type) {
- return null;
- }
-
- private enum NoopContext implements Context {
- INSTANCE;
-
- @Override
- public long stop() {
- return 0;
- }
-
- @Override
- public void close() {
-
- }
- }
-}
diff --git a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberMetrics.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberMetrics.java
deleted file mode 100644
index f5bcae0..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberMetrics.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.service.subscriber;
-
-import static java.lang.String.format;
-
-import java.io.Closeable;
-import java.util.Dictionary;
-import java.util.Hashtable;
-import java.util.function.Supplier;
-
-import org.apache.sling.commons.metrics.Counter;
-import org.apache.sling.commons.metrics.Gauge;
-import org.apache.sling.commons.metrics.Histogram;
-import org.apache.sling.commons.metrics.Meter;
-import org.apache.sling.commons.metrics.MetricsService;
-import org.apache.sling.commons.metrics.Timer;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.Constants;
-import org.osgi.framework.ServiceRegistration;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Component(service = SubscriberMetrics.class)
-public class SubscriberMetrics {
-
- public static final String BASE_COMPONENT = "distribution.journal";
-
- public static final String SUB_COMPONENT = BASE_COMPONENT + ".subscriber";
-
- private final Logger log = LoggerFactory.getLogger(this.getClass());
-
- @Reference
- private MetricsService metricsService;
-
- private Histogram importedPackageSize;
-
- private Counter itemsBufferSize;
-
- private Timer removedPackageDuration;
-
- private Timer removedFailedPackageDuration;
-
- private Timer importedPackageDuration;
-
- private Meter failedPackageImports;
-
- private Timer sendStoredStatusDuration;
-
- private Timer processQueueItemDuration;
-
- private Timer packageDistributedDuration;
-
- private BundleContext context;
-
- public SubscriberMetrics() {
- importedPackageSize = NoopMetric.INSTANCE;
- itemsBufferSize = NoopMetric.INSTANCE;
- importedPackageDuration = NoopMetric.INSTANCE;
- removedPackageDuration = NoopMetric.INSTANCE;
- removedFailedPackageDuration = NoopMetric.INSTANCE;
- failedPackageImports = NoopMetric.INSTANCE;
- sendStoredStatusDuration = NoopMetric.INSTANCE;
- processQueueItemDuration = NoopMetric.INSTANCE;
- packageDistributedDuration = NoopMetric.INSTANCE;
- }
-
- @Activate
- public void activate(BundleContext context) {
- this.context = context;
- importedPackageSize = getHistogram(getMetricName(SUB_COMPONENT, "imported_package_size"));
- itemsBufferSize = getCounter(getMetricName(SUB_COMPONENT, "items_buffer_size"));
- importedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, "imported_package_duration"));
- removedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, "removed_package_duration"));
- removedFailedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, "removed_failed_package_duration"));
- failedPackageImports = getMeter(getMetricName(SUB_COMPONENT, "failed_package_imports"));
- sendStoredStatusDuration = getTimer(getMetricName(SUB_COMPONENT, "send_stored_status_duration"));
- processQueueItemDuration = getTimer(getMetricName(SUB_COMPONENT, "process_queue_item_duration"));
- packageDistributedDuration = getTimer(getMetricName(SUB_COMPONENT, "request_distributed_duration"));
- }
-
- /**
- * Histogram of the imported content package size in Byte.
- *
- * @return a Sling Metrics histogram
- */
- public Histogram getImportedPackageSize() {
- return importedPackageSize;
- }
-
- /**
- * Counter of the package buffer size on the subscriber.
- *
- * @return a Sling Metrics counter
- */
- public Counter getItemsBufferSize() {
- return itemsBufferSize;
- }
-
- /**
- * Timer capturing the duration in ms of successful packages import operations.
- *
- * @return a Sling Metrics timer
- */
- public Timer getImportedPackageDuration() {
- return importedPackageDuration;
- }
-
- /**
- * Timer capturing the duration in ms of packages successfully removed from an editable subscriber.
- *
- * @return a Sling Metrics timer
- */
- public Timer getRemovedPackageDuration() {
- return removedPackageDuration;
- }
-
- /**
- * Timer capturing the duration in ms of packages successfully removed automatically from a subscriber supporting error queue.
- *
- * @return a Sling Metrics timer
- */
- public Timer getRemovedFailedPackageDuration() {
- return removedFailedPackageDuration;
- }
-
- /**
- * Meter of failures to import packages.
- *
- * @return a Sling Metrics meter
- */
- public Meter getFailedPackageImports() {
- return failedPackageImports;
- }
-
- /**
- * Timer capturing the duration in ms of sending a stored package status.
- *
- * @return a Sling Metric timer
- */
- public Timer getSendStoredStatusDuration() {
- return sendStoredStatusDuration;
- }
-
- /**
- * Timer capturing the duration in ms of processing a queue item.
- *
- * @return a Sling Metric timer
- */
- public Timer getProcessQueueItemDuration() {
- return processQueueItemDuration;
- }
-
- /**
- * Timer capturing the duration in ms of distributing a distribution package.
- * The timer starts when the package is enqueued and stops when the package is successfully imported.
- *
- * @return a Sling Metric timer
- */
- public Timer getPackageDistributedDuration() {
- return packageDistributedDuration;
- }
-
- public <T> GaugeService<T> createGauge(String name, String description, Supplier<T> supplier) {
- return new GaugeService<>(name, description, supplier);
- }
-
- private String getMetricName(String component, String name) {
- return format("%s.%s", component, name);
- }
-
- private Counter getCounter(String metricName) {
- return metricsService.counter(metricName);
- }
-
- private Timer getTimer(String metricName) {
- return metricsService.timer(metricName);
- }
-
- private Histogram getHistogram(String metricName) {
- return metricsService.histogram(metricName);
- }
-
- private Meter getMeter(String metricName) {
- return metricsService.meter(metricName);
- }
-
- public class GaugeService<T> implements Gauge<T>, Closeable {
-
- @SuppressWarnings("rawtypes")
- private ServiceRegistration<Gauge> reg;
- private final Supplier<T> supplier;
-
- private GaugeService(String name, String description, Supplier<T> supplier) {
- this.supplier = supplier;
- Dictionary<String, String> props = new Hashtable<>();
- props.put(Constants.SERVICE_DESCRIPTION, description);
- props.put(Constants.SERVICE_VENDOR, "The Apache Software Foundation");
- props.put(Gauge.NAME, name);
- if (context != null) {
- reg = context.registerService(Gauge.class, this, props);
- }
- }
-
- @Override
- public T getValue() {
- return supplier.get();
- }
-
- @Override
- public void close() {
- try {
- reg.unregister();
- } catch (Exception e) {
- log.warn("Error unregistering service", e);
- }
- }
- }
-
-}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java
index 7808b12..f279265 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java
@@ -22,13 +22,12 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
import org.apache.sling.distribution.journal.impl.precondition.DefaultPrecondition;
-import org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
import org.junit.Test;
public class DefaultPreconditionTest {
@Test
public void testAlwaysTrue() {
- Decision decision = new DefaultPrecondition().canProcess("any", 100);
- assertThat(decision, equalTo(Decision.ACCEPT));
+ boolean canProcess = new DefaultPrecondition().canProcess("any", 100, 10);
+ assertThat(canProcess, equalTo(true));
}
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
index 1484a9e..571d98a 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
@@ -19,10 +19,10 @@
package org.apache.sling.distribution.journal.impl.precondition;
import org.apache.sling.distribution.journal.impl.precondition.PackageStatusWatcher;
+import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
-import org.apache.sling.distribution.journal.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.MessagingProvider;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
index a5e969b..eab0479 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
@@ -19,21 +19,26 @@
package org.apache.sling.distribution.journal.impl.precondition;
import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.Closeable;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
+import org.apache.sling.distribution.journal.impl.precondition.StagingPrecondition;
+import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
-import org.apache.sling.distribution.journal.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.shared.Topics;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.junit.Before;
@@ -84,28 +89,47 @@ public class StagingPreconditionTest {
statusHandler = statusCaptor.getValue().getHandler();
}
- @Test
+ @Test(expected = IllegalArgumentException.class)
+ public void testIllegalTimeout() throws InterruptedException, TimeoutException {
+ precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT, -1);
+ }
+
+ @Test(expected = TimeoutException.class)
public void testNotYetProcessed() throws InterruptedException, TimeoutException {
simulateMessage(OTHER_AGENT, 1002, PackageStatusMessage.Status.IMPORTED);
- Decision res = precondition.canProcess(OTHER_AGENT, OFFSET_NOT_PRESENT);
- assertThat(res, equalTo(Decision.WAIT));
-
- Decision res2 = precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT);
- assertThat(res2, equalTo(Decision.WAIT));
+ boolean res = precondition.canProcess(OTHER_AGENT, OFFSET_NOT_PRESENT, 1);
+ assertThat(res, equalTo(true));
+ // We got no package for this agent. So this should time out
+ precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT, 1);
}
@Test
+ public void testDeactivateDuringCanProcess() {
+ AtomicReference<Throwable> exHolder = new AtomicReference<>();
+ Thread th = new Thread(() -> {
+ try {
+ precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT, 2);
+ } catch (Throwable t) {
+ exHolder.set(t);
+ }
+ });
+ th.start();
+ precondition.deactivate();
+ Throwable ex = Awaitility.await().until(() -> exHolder.get(), notNullValue());
+ assertThat(ex, instanceOf(InterruptedException.class));
+ }
+
+ @Test(expected = TimeoutException.class)
public void testCleanup() throws InterruptedException, TimeoutException {
simulateMessage(GP_SUB1_AGENT_NAME, 1002, PackageStatusMessage.Status.IMPORTED);
- Decision res = precondition.canProcess(GP_SUB1_AGENT_NAME, 1002);
- assertThat(res, equalTo(Decision.ACCEPT));
+ assertTrue(precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1));
// Cleanup
precondition.run();
- Decision res2 = precondition.canProcess(GP_SUB1_AGENT_NAME, 1002);
- assertThat(res2, equalTo(Decision.WAIT));
+ // Should time out because after cleanup message is not present anymore
+ precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1);
}
@Test
@@ -114,9 +138,9 @@ public class StagingPreconditionTest {
simulateMessage(GP_SUB1_AGENT_NAME, 1001, PackageStatusMessage.Status.REMOVED);
simulateMessage(GP_SUB1_AGENT_NAME, 1002, PackageStatusMessage.Status.IMPORTED);
- assertThat(precondition.canProcess(GP_SUB1_AGENT_NAME, 1000), equalTo(Decision.SKIP));
- assertThat(precondition.canProcess(GP_SUB1_AGENT_NAME, 1001), equalTo(Decision.SKIP));
- assertThat(precondition.canProcess(GP_SUB1_AGENT_NAME, 1002), equalTo(Decision.ACCEPT));
+ assertFalse(precondition.canProcess(GP_SUB1_AGENT_NAME, 1000, 1));
+ assertFalse(precondition.canProcess(GP_SUB1_AGENT_NAME, 1001, 1));
+ assertTrue(precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1));
}
private void simulateMessage(String subAgentName, long pkgOffset, PackageStatusMessage.Status status) {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
index 2b82486..5e7b76c 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
@@ -30,6 +30,8 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.UUID;
+import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -38,8 +40,6 @@ import org.mockito.Mockito;
import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
-import org.apache.sling.distribution.journal.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.MessageInfo;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java
index f52026c..2a41a2f 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java
@@ -41,7 +41,6 @@ import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
-import org.apache.sling.distribution.journal.shared.JMXRegistration;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.DistributionQueueItemState;
@@ -54,6 +53,8 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
+import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
+
@SuppressWarnings("rawtypes")
public class DistPublisherJMXTest {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index 434c561..7ab2bdf 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -38,9 +38,9 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import com.google.common.collect.ImmutableMap;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
index ab5d17e..acc91b0 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
@@ -21,9 +21,9 @@ package org.apache.sling.distribution.journal.impl.publisher;
import java.util.Collections;
import java.util.HashSet;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
import org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
-import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.queue.DistributionQueueItem;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
index 9b41463..a8fb467 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
@@ -30,6 +30,8 @@ import java.util.List;
import java.util.UUID;
import java.util.function.Function;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
@@ -55,8 +57,6 @@ import org.mockito.Spy;
import org.osgi.framework.BundleContext;
import org.apache.sling.distribution.journal.messages.Messages;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java
index e3b5582..6aeb1ba 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java
@@ -31,13 +31,13 @@ import static org.hamcrest.Matchers.hasItemInArray;
import static org.junit.Assert.assertThat;
import org.apache.sling.distribution.DistributionRequestType;
+import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.hamcrest.Matcher;
import org.junit.Test;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
-import org.apache.sling.distribution.journal.shared.TestMessageInfo;
import org.apache.sling.distribution.journal.MessageInfo;
public class QueueItemFactoryTest {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtilTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtilTest.java
index 5aededc..11a52f9 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtilTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtilTest.java
@@ -20,7 +20,6 @@ package org.apache.sling.distribution.journal.impl.queue.impl;
import java.util.HashMap;
-import org.apache.sling.distribution.journal.shared.EntryUtil;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.junit.Test;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplJMXTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplJMXTest.java
index e7f9639..18cc957 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplJMXTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplJMXTest.java
@@ -34,7 +34,7 @@ import javax.management.ObjectName;
import javax.management.ReflectionException;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.shared.JMXRegistration;
+import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
import org.junit.Test;
public class OffsetQueueImplJMXTest {
diff --git a/src/test/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetriesTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetriesTest.java
similarity index 90%
rename from src/test/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetriesTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetriesTest.java
index 4f2f0a4..6c33412 100644
--- a/src/test/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetriesTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetriesTest.java
@@ -16,14 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.service.subscriber;
+package org.apache.sling.distribution.journal.impl.queue.impl;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
-import org.apache.sling.distribution.journal.service.subscriber.PackageRetries;
-
public class PackageRetriesTest {
@Test
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
index 5fca8a3..1113319 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
@@ -57,10 +57,10 @@ import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.TestMessageInfo;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
index 166ad1a..61c1389 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
@@ -38,6 +38,7 @@ import javax.management.ObjectInstance;
import javax.management.ObjectName;
import javax.management.ReflectionException;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.MessageSender;
import com.google.protobuf.GeneratedMessage;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
@@ -57,7 +58,6 @@ import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
-import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.MessageInfo;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java
index 211b9fc..20dccea 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java
@@ -37,7 +37,6 @@ import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.shared.EntryUtil;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.DistributionQueueType;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
index 3cef383..778bbaa 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
@@ -28,6 +28,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.List;
+import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -40,7 +41,6 @@ import org.mockito.MockitoAnnotations;
import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
-import org.apache.sling.distribution.journal.shared.TestMessageInfo;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueueTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueueTest.java
new file mode 100644
index 0000000..d2784e9
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueueTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.queue.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
+import com.google.common.collect.Lists;
+
+public class SubQueueTest {
+
+ @Test
+ public void testGetName() throws Exception {
+ String queueName = "someQueue";
+ SubQueue queue = new SubQueue(queueName, null, new PackageRetries());
+ Assert.assertEquals(queueName, queue.getName());
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testAdd() throws Exception {
+ SubQueue queue = new SubQueue("someQueue", null, new PackageRetries());
+ queue.add(buildQueueItem("package-1"));
+ }
+
+ @Test
+ public void testGetHead() throws Exception {
+ SubQueue emptyQueue = new SubQueue("emptyQueue", null, new PackageRetries());
+ Assert.assertNull(emptyQueue.getHead());
+ SubQueue oneQueue = new SubQueue("oneQueue", buildQueueItem("1"), new PackageRetries());
+ Assert.assertNotNull(oneQueue.getHead());
+ }
+
+ @Test
+ public void testGetItems() throws Exception {
+ SubQueue oneQueue = new SubQueue("oneQueue", null, new PackageRetries());
+ Assert.assertNotNull(oneQueue.getEntries(0, 10));
+ SubQueue tenQueue = new SubQueue("tenQueue", buildQueueItem("1"), new PackageRetries());
+ Assert.assertEquals(1, Lists.newArrayList(tenQueue.getEntries(0, 10)).size());
+ Assert.assertEquals(1, Lists.newArrayList(tenQueue.getEntries(0, -1)).size());
+ Assert.assertEquals(0, Lists.newArrayList(tenQueue.getEntries(1, 10)).size());
+ }
+
+ private DistributionQueueItem buildQueueItem(String packageId) {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put(QueueItemFactory.RECORD_TOPIC, "topic");
+ properties.put(QueueItemFactory.RECORD_OFFSET, 0);
+ properties.put(QueueItemFactory.RECORD_PARTITION, 0);
+ properties.put(QueueItemFactory.RECORD_TIMESTAMP, System.currentTimeMillis());
+ return new DistributionQueueItem(packageId, properties);
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsServiceTest.java
similarity index 85%
rename from src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsServiceTest.java
index 50d5507..1049ef5 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsServiceTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertNotNull;
@@ -32,8 +32,7 @@ import org.apache.sling.commons.metrics.Meter;
import org.apache.sling.commons.metrics.MetricsService;
import org.apache.sling.commons.metrics.Timer;
import org.apache.sling.commons.metrics.Timer.Context;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService.GaugeService;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -95,7 +94,16 @@ public class DistributionMetricsServiceTest {
assertNotNull(metrics.getDroppedRequests());
assertNotNull(metrics.getEnqueuePackageDuration());
assertNotNull(metrics.getExportedPackageSize());
+ assertNotNull(metrics.getFailedPackageImports());
+ assertNotNull(metrics.getImportedPackageDuration());
+ assertNotNull(metrics.getImportedPackageSize());
+ assertNotNull(metrics.getItemsBufferSize());
+ assertNotNull(metrics.getPackageDistributedDuration());
+ assertNotNull(metrics.getProcessQueueItemDuration());
assertNotNull(metrics.getQueueCacheFetchCount());
+ assertNotNull(metrics.getRemovedFailedPackageDuration());
+ assertNotNull(metrics.getRemovedPackageDuration());
+ assertNotNull(metrics.getSendStoredStatusDuration());
}
@Test
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/ExponentialBackoffTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/shared/ExponentialBackoffTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
index 8fe7ad9..bd44057 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/ExponentialBackoffTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
import static java.time.Duration.of;
import static java.time.temporal.ChronoUnit.MILLIS;
@@ -27,7 +27,6 @@ import java.time.temporal.ChronoUnit;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.apache.sling.distribution.journal.shared.ExponentialBackOff;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.runners.MockitoJUnitRunner;
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/JournalAvailableCheckerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
similarity index 90%
rename from src/test/java/org/apache/sling/distribution/journal/shared/JournalAvailableCheckerTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
index 0214b83..ce7f279 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/JournalAvailableCheckerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
@@ -35,12 +35,9 @@ import org.apache.sling.distribution.journal.ExceptionEventSender;
import org.apache.sling.distribution.journal.JournalAvailable;
import org.apache.sling.distribution.journal.MessagingException;
import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.JournalAvailableChecker;
-import org.apache.sling.distribution.journal.shared.Topics;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService.GaugeService;
-import org.apache.sling.distribution.journal.shared.JournalAvailableChecker.JournalCheckerConfiguration;
-import org.apache.sling.distribution.journal.shared.Topics.TopicsConfiguration;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
+import org.apache.sling.distribution.journal.impl.shared.JournalAvailableChecker.JournalCheckerConfiguration;
+import org.apache.sling.distribution.journal.impl.shared.Topics.TopicsConfiguration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/LimitPollerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/LimitPollerTest.java
similarity index 97%
rename from src/test/java/org/apache/sling/distribution/journal/shared/LimitPollerTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/shared/LimitPollerTest.java
index 7fd0207..fed9fef 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/LimitPollerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/LimitPollerTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.samePropertyValuesAs;
@@ -29,6 +29,7 @@ import java.io.IOException;
import java.time.Duration;
import java.util.List;
+import org.apache.sling.distribution.journal.impl.shared.LimitPoller;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -41,7 +42,6 @@ import org.mockito.MockitoAnnotations;
import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
-import org.apache.sling.distribution.journal.shared.LimitPoller;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/PackageBrowserTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/shared/PackageBrowserTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java
index ede1b69..14dcc19 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/PackageBrowserTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
@@ -36,7 +36,6 @@ import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
-import org.apache.sling.distribution.journal.shared.PackageBrowser;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/PackageViewerPluginTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
similarity index 95%
rename from src/test/java/org/apache/sling/distribution/journal/shared/PackageViewerPluginTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
index f483c86..aa2947a 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/PackageViewerPluginTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
import static java.util.Collections.emptyList;
import static org.hamcrest.Matchers.containsString;
@@ -43,9 +43,6 @@ import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
-import org.apache.sling.distribution.journal.shared.PackageBrowser;
-import org.apache.sling.distribution.journal.shared.PackageViewerPlugin;
-import org.apache.sling.distribution.journal.shared.Topics;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponseTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponseTest.java
similarity index 93%
rename from src/test/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponseTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponseTest.java
index 7202b6b..5d31bf6 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponseTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponseTest.java
@@ -16,14 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.sling.distribution.DistributionRequestState;
-import org.apache.sling.distribution.journal.shared.SimpleDistributionResponse;
import org.junit.Test;
public class SimpleDistributionResponseTest {
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/TestMessageInfo.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/TestMessageInfo.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/shared/TestMessageInfo.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/shared/TestMessageInfo.java
index 9145dfe..a2105e3 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/TestMessageInfo.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/TestMessageInfo.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.shared;
+package org.apache.sling.distribution.journal.impl.shared;
import org.apache.sling.distribution.journal.MessageInfo;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
index e669a82..05f8a88 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
@@ -27,9 +27,7 @@ import static org.mockito.Mockito.when;
import java.util.Collections;
import java.util.function.Consumer;
-import org.apache.sling.distribution.journal.impl.subscriber.Announcer;
import org.apache.sling.distribution.journal.messages.Messages;
-import org.apache.sling.distribution.journal.service.subscriber.BookKeeper;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
diff --git a/src/test/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeperTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
similarity index 61%
rename from src/test/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeperTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
index 2ae2803..fd4c761 100644
--- a/src/test/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeperTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
@@ -16,32 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.service.subscriber;
+package org.apache.sling.distribution.journal.impl.subscriber;
-import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.ResourceResolverFactory;
-import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.impl.subscriber.SubscriberTest;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
-import org.apache.sling.distribution.journal.shared.TestMessageInfo;
import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
-import org.osgi.framework.BundleContext;
import org.osgi.service.event.EventAdmin;
@RunWith(MockitoJUnitRunner.class)
@@ -51,7 +43,8 @@ public class BookKeeperTest {
private ResourceResolverFactory resolverFactory = new MockResourceResolverFactory();
- private SubscriberMetrics subscriberMetrics = new SubscriberMetrics();
+ @Mock
+ private DistributionMetricsService distributionMetricsService;
@Mock
private PackageHandler packageHandler;
@@ -61,18 +54,13 @@ public class BookKeeperTest {
@Mock
private Consumer<PackageStatusMessage> sender;
-
- @Mock
- BundleContext context;
private BookKeeper bookKeeper;
@Before
public void before() {
- SubscriberIdle subscriberIdle = new SubscriberIdle(context, 300);
- bookKeeper = new BookKeeper(resolverFactory, subscriberMetrics, packageHandler, subscriberIdle, eventAdmin, sender,
+ bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packageHandler, eventAdmin, sender,
"subAgentName", "subSlingId", true, 10);
- sem = new Semaphore(0);
}
@Test
@@ -90,29 +78,5 @@ public class BookKeeperTest {
assertThat(bookKeeper.loadOffset(), equalTo(20l));
}
}
-
- @Test
- public void testReadyWhenWatingForPrecondition() {
-
- MessageInfo info = new TestMessageInfo("", 1, 0, 0);
- PackageMessage message = SubscriberTest.BASIC_ADD_PACKAGE;
- Executors.newSingleThreadExecutor().execute(() -> {
- try {
- bookKeeper.processPackage(info, message, this::waitForSemaphore);
- } catch (Exception e) {
- }
- });
- await("Should report ready").until(bookKeeper.subscriberIdle::isReady);
- sem.release();
- }
-
- public boolean waitForSemaphore(long offset, PackageMessage msg) {
- try {
- return sem.tryAcquire(10, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- return false;
- }
- }
- private Semaphore sem;
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
index 0de3d2e..d6c774e 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
@@ -33,10 +33,10 @@ import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages.ClearCommand;
import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
-import org.apache.sling.distribution.journal.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.shared.Topics;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.junit.Before;
diff --git a/src/test/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractorTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractorTest.java
similarity index 97%
rename from src/test/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractorTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractorTest.java
index 096b632..0868cc8 100644
--- a/src/test/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractorTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractorTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.service.subscriber;
+package org.apache.sling.distribution.journal.impl.subscriber;
import static java.util.Collections.singletonList;
import static org.mockito.Mockito.verify;
@@ -37,7 +37,6 @@ import org.apache.jackrabbit.vault.packaging.Packaging;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
-import org.apache.sling.distribution.journal.service.subscriber.ContentPackageExtractor;
import org.apache.sling.testing.mock.osgi.MockOsgi;
import org.apache.sling.testing.mock.sling.MockSling;
import org.apache.sling.testing.mock.sling.ResourceResolverType;
diff --git a/src/test/java/org/apache/sling/distribution/journal/service/subscriber/LocalStoreTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/service/subscriber/LocalStoreTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java
index 1a03199..11b346a 100644
--- a/src/test/java/org/apache/sling/distribution/journal/service/subscriber/LocalStoreTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.service.subscriber;
+package org.apache.sling.distribution.journal.impl.subscriber;
import java.util.HashMap;
import java.util.Map;
@@ -29,7 +29,6 @@ import static org.junit.Assert.assertEquals;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.ResourceResolver;
-import org.apache.sling.distribution.journal.service.subscriber.LocalStore;
import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
import org.junit.Test;
diff --git a/src/test/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdleTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
similarity index 94%
rename from src/test/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdleTest.java
rename to src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
index 6c40034..ef5f0d7 100644
--- a/src/test/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdleTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
@@ -16,13 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.service.subscriber;
+package org.apache.sling.distribution.journal.impl.subscriber;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
import org.apache.felix.systemready.CheckStatus.State;
-import org.apache.sling.distribution.journal.service.subscriber.SubscriberIdle;
import org.junit.Test;
import org.mockito.Mockito;
import org.osgi.framework.BundleContext;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index 4d0710c..1d47520 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -21,9 +21,12 @@ package org.apache.sling.distribution.journal.impl.subscriber;
import static org.apache.sling.distribution.agent.DistributionAgentState.IDLE;
import static org.apache.sling.distribution.agent.DistributionAgentState.RUNNING;
import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.eq;
@@ -37,39 +40,42 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Dictionary;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.jackrabbit.vault.packaging.Packaging;
+import org.apache.sling.distribution.journal.impl.precondition.Precondition;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
-import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.api.resource.ResourceUtil;
+import org.apache.sling.commons.metrics.Counter;
+import org.apache.sling.commons.metrics.Histogram;
+import org.apache.sling.commons.metrics.Meter;
+import org.apache.sling.commons.metrics.Timer;
+import org.apache.sling.distribution.DistributionRequest;
+import org.apache.sling.distribution.DistributionRequestState;
+import org.apache.sling.distribution.DistributionRequestType;
+import org.apache.sling.distribution.DistributionResponse;
+import org.apache.sling.distribution.SimpleDistributionRequest;
import org.apache.sling.distribution.agent.DistributionAgentState;
import org.apache.sling.distribution.agent.spi.DistributionAgent;
import org.apache.sling.distribution.common.DistributionException;
-import org.apache.sling.distribution.journal.HandlerAdapter;
-import org.apache.sling.distribution.journal.MessageHandler;
-import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessageSender;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.impl.precondition.Precondition;
-import org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
-import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
-import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
-import org.apache.sling.distribution.journal.service.subscriber.BookKeeperFactory;
-import org.apache.sling.distribution.journal.service.subscriber.SubscriberMetrics;
-import org.apache.sling.distribution.journal.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.apache.sling.distribution.packaging.DistributionPackageInfo;
+import org.apache.sling.distribution.queue.DistributionQueueEntry;
+import org.apache.sling.distribution.queue.DistributionQueueItemState;
+import org.apache.sling.distribution.queue.DistributionQueueState;
+import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.settings.SlingSettingsService;
import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
import org.awaitility.Awaitility;
@@ -77,26 +83,34 @@ import org.awaitility.Duration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
-import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.event.EventAdmin;
import org.osgi.util.converter.Converters;
+import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+
+import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.MessageHandler;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
@SuppressWarnings("unchecked")
-@RunWith(MockitoJUnitRunner.class)
public class SubscriberTest {
private static final String SUB1_SLING_ID = "sub1sling";
@@ -105,7 +119,7 @@ public class SubscriberTest {
private static final String PUB1_SLING_ID = "pub1sling";
private static final String PUB1_AGENT_NAME = "pub1agent";
- public static final PackageMessage BASIC_ADD_PACKAGE = PackageMessage.newBuilder()
+ private static final PackageMessage BASIC_ADD_PACKAGE = PackageMessage.newBuilder()
.setPkgId("myid")
.setPubSlingId(PUB1_SLING_ID)
.setPubAgentName(PUB1_AGENT_NAME)
@@ -124,8 +138,6 @@ public class SubscriberTest {
.addAllPaths(Arrays.asList("/test"))
.build();
- @Mock
- Packaging packaging;
@Mock
private BundleContext context;
@@ -160,10 +172,8 @@ public class SubscriberTest {
@Mock
private MessageSender<PackageStatusMessage> statusSender;
- @Spy
- private SubscriberMetrics subscriberMetrics = new SubscriberMetrics();
-
- BookKeeperFactory bookKeeperFactory;
+ @Mock
+ private DistributionMetricsService distributionMetricsService;
@InjectMocks
DistributionSubscriber subscriber;
@@ -188,14 +198,12 @@ public class SubscriberTest {
Awaitility.setDefaultPollDelay(Duration.ZERO);
Awaitility.setDefaultPollInterval(Duration.ONE_HUNDRED_MILLISECONDS);
-
- bookKeeperFactory = new BookKeeperFactory(resolverFactory, eventAdmin, subscriberMetrics, packaging);
- Map<String, Object> props = new HashMap<String, Object>();
- bookKeeperFactory.activate(context, props);
- subscriber.bookKeeperFactory = bookKeeperFactory;
+ MockitoAnnotations.initMocks(this);
when(packageBuilder.getType()).thenReturn("journal");
when(slingSettings.getSlingId()).thenReturn(SUB1_SLING_ID);
+ mockMetrics();
+
when(clientProvider.<PackageStatusMessage>createSender()).thenReturn(statusSender, (MessageSender) discoverySender);
when(clientProvider.createPoller(
Mockito.anyString(),
@@ -203,7 +211,8 @@ public class SubscriberTest {
Mockito.anyString(),
packageCaptor.capture()))
.thenReturn(poller);
-
+ when(context.registerService(Mockito.any(Class.class), (DistributionAgent) eq(subscriber), Mockito.any(Dictionary.class))).thenReturn(reg);
+
// you should call initSubscriber in each test method
}
@@ -218,6 +227,10 @@ public class SubscriberTest {
assumeNoPrecondition();
initSubscriber();
+ assertThat(subscriber.getQueueNames(), contains(PUB1_AGENT_NAME));
+ assertThat(subscriber.getQueue(PUB1_AGENT_NAME).getStatus().getState(), equalTo(DistributionQueueState.IDLE));
+ assertThat(subscriber.getState(), equalTo(DistributionAgentState.IDLE));
+
MessageInfo info = new TestMessageInfo("", 1, 0, 0);
PackageMessage message = BASIC_ADD_PACKAGE;
@@ -226,13 +239,20 @@ public class SubscriberTest {
when(packageBuilder.installPackage(Mockito.any(ResourceResolver.class),
Mockito.any(ByteArrayInputStream.class))
).thenAnswer(new WaitFor(sem));
- packageHandler.handle(info, message);
+ packageHandler.handle(info, message);
+
waitSubscriber(RUNNING);
+ DistributionQueue queue = subscriber.getQueue(PUB1_AGENT_NAME);
+ DistributionQueueEntry item = queue.getHead();
+ assertThat(item.getStatus().getItemState(), equalTo(DistributionQueueItemState.QUEUED));
sem.release();
waitSubscriber(IDLE);
verify(statusSender, times(0)).send(eq(topics.getStatusTopic()),
anyObject());
+ List<String> log = subscriber.getLog().getLines();
+ // We do not use the DistributionLog anymore
+ assertThat(log.size(), equalTo(0));
}
@Test
@@ -248,10 +268,25 @@ public class SubscriberTest {
PackageMessage message = BASIC_DEL_PACKAGE;
packageHandler.handle(info, message);
- await().until(() -> getResource("/test"), nullValue());
+ waitSubscriber(RUNNING);
+ waitSubscriber(IDLE);
+ try (ResourceResolver resolver = resolverFactory.getServiceResourceResolver(null)) {
+ assertThat(resolver.getResource("/test"), nullValue());
+ }
}
@Test
+ public void testExecuteNotSupported() throws DistributionException {
+ assumeNoPrecondition();
+ initSubscriber();
+
+ DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, "test");
+ DistributionResponse response = subscriber.execute(resourceResolver, request);
+ assertThat(response.getState(), equalTo(DistributionRequestState.DROPPED));
+ }
+
+
+ @Test
public void testSendFailedStatus() throws DistributionException {
assumeNoPrecondition();
initSubscriber(ImmutableMap.of("maxRetries", "1"));
@@ -289,17 +324,37 @@ public class SubscriberTest {
initSubscriber();
MessageInfo info = new TestMessageInfo("", 1, 11, 0);
PackageMessage message = BASIC_ADD_PACKAGE;
- when(precondition.canProcess(eq(SUB1_AGENT_NAME), eq(11))).thenReturn(Decision.SKIP);
packageHandler.handle(info, message);
+ waitSubscriber(RUNNING);
+ when(precondition.canProcess(eq(SUB1_AGENT_NAME), eq(11), anyInt())).thenReturn(false);
+
try {
waitSubscriber(IDLE);
fail("Cannot be IDLE without a validation status");
} catch (Throwable t) {
}
+
+ when(precondition.canProcess(eq(SUB1_AGENT_NAME), eq(11), anyInt())).thenReturn(true);
+ waitSubscriber(IDLE);
+
}
+ @Test
+ public void testReadyWhenWatingForPrecondition() {
+ Semaphore sem = new Semaphore(0);
+ assumeWaitingForPrecondition(sem);
+ initSubscriber();
+ MessageInfo info = new TestMessageInfo("", 1, 0, 0);
+ PackageMessage message = BASIC_ADD_PACKAGE;
+
+ packageHandler.handle(info, message);
+ waitSubscriber(RUNNING);
+ await("Should report ready").until(subscriber.subscriberIdle::isReady);
+ sem.release();
+ }
+
private void initSubscriber() {
initSubscriber(Collections.emptyMap());
}
@@ -321,20 +376,51 @@ public class SubscriberTest {
await().until(subscriber::getState, equalTo(expectedState));
}
+ private void mockMetrics() {
+ Histogram histogram = Mockito.mock(Histogram.class);
+ Counter counter = Mockito.mock(Counter.class);
+ Meter meter = Mockito.mock(Meter.class);
+ Timer timer = Mockito.mock(Timer.class);
+ Timer.Context timerContext = Mockito.mock(Timer.Context.class);
+ when(timer.time())
+ .thenReturn(timerContext);
+ when(distributionMetricsService.getImportedPackageSize())
+ .thenReturn(histogram);
+ when(distributionMetricsService.getItemsBufferSize())
+ .thenReturn(counter);
+ when(distributionMetricsService.getFailedPackageImports())
+ .thenReturn(meter);
+ when(distributionMetricsService.getRemovedFailedPackageDuration())
+ .thenReturn(timer);
+ when(distributionMetricsService.getRemovedPackageDuration())
+ .thenReturn(timer);
+ when(distributionMetricsService.getImportedPackageDuration())
+ .thenReturn(timer);
+ when(distributionMetricsService.getSendStoredStatusDuration())
+ .thenReturn(timer);
+ when(distributionMetricsService.getProcessQueueItemDuration())
+ .thenReturn(timer);
+ when(distributionMetricsService.getPackageDistributedDuration())
+ .thenReturn(timer);
+ }
+
private void assumeNoPrecondition() {
try {
- when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong())).thenReturn(Decision.ACCEPT);
+ when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong(), anyInt())).thenReturn(true);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
- private Resource getResource(String path) throws LoginException {
- try (ResourceResolver resolver = resolverFactory.getServiceResourceResolver(null)) {
- return resolver.getResource(path);
+ private void assumeWaitingForPrecondition(Semaphore sem) {
+ try {
+ when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong(), anyInt()))
+ .thenAnswer(invocation -> sem.tryAcquire(10000, TimeUnit.SECONDS));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}
-
+
private final class WaitFor implements Answer<DistributionPackageInfo> {
private final Semaphore sem;