You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2020/03/26 12:03:22 UTC
[sling-org-apache-sling-distribution-journal] 01/01: SLING-9259 -
Extract service subscriber code into separate package
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch SLING-9259
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit 3ff39d5ee7d1607283a2a571b6c8e2224be1e839
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Thu Mar 26 11:30:53 2020 +0100
SLING-9259 - Extract service subscriber code into separate package
---
.../impl/precondition/PackageStatusWatcher.java | 2 +-
.../impl/precondition/StagingPrecondition.java | 2 +-
.../journal/impl/publisher/DiscoveryService.java | 2 +-
.../impl/publisher/DistributionPublisher.java | 15 ++-
.../impl/publisher/PackageDistributedNotifier.java | 2 +-
.../journal/impl/publisher/PackageRepo.java | 4 +-
.../journal/impl/queue/impl/PubErrQueue.java | 2 +
.../journal/impl/queue/impl/PubQueue.java | 2 +
.../journal/impl/queue/impl/PubQueueCache.java | 4 +-
.../impl/queue/impl/PubQueueCacheService.java | 4 +-
.../impl/queue/impl/PubQueueProviderImpl.java | 2 +-
.../journal/impl/subscriber/Announcer.java | 1 +
.../journal/impl/subscriber/CommandPoller.java | 2 +-
.../impl/subscriber/DistributionSubscriber.java | 23 ++--
.../impl/{queue/impl => subscriber}/SubQueue.java | 5 +-
.../impl/subscriber/SubscriberConfiguration.java | 1 +
.../{impl => service}/subscriber/BookKeeper.java | 32 +++--
.../subscriber/ContentPackageExtractor.java | 2 +-
.../service/subscriber/ImportedEventFactory.java | 55 +++++++++
.../{impl => service}/subscriber/LocalStore.java | 2 +-
.../subscriber/PackageHandler.java | 34 +++++-
.../subscriber/PackageHandling.java | 2 +-
.../subscriber}/PackageRetries.java | 2 +-
.../subscriber/SubscriberIdle.java | 2 +-
.../subscriber/SubscriberMetrics.java} | 136 +--------------------
.../journal/{impl => }/shared/AgentState.java | 2 +-
.../{impl => }/shared/DefaultDistributionLog.java | 2 +-
.../shared/DistributionMetricsService.java | 112 +----------------
.../{impl/queue/impl => shared}/EntryUtil.java | 2 +-
.../{impl => }/shared/ExponentialBackOff.java | 2 +-
.../journal/{impl => }/shared/JMXRegistration.java | 2 +-
.../{impl => }/shared/JournalAvailableChecker.java | 4 +-
.../shared/JournalAvailableServiceMarker.java | 2 +-
.../journal/{impl => }/shared/LimitPoller.java | 2 +-
.../journal/{impl => }/shared/PackageBrowser.java | 2 +-
.../{impl => }/shared/PackageViewerPlugin.java | 2 +-
.../queue/impl => shared}/QueueEntryFactory.java | 2 +-
.../shared/SimpleDistributionResponse.java | 2 +-
.../journal/{impl => }/shared/Topics.java | 2 +-
.../precondition/PackageStatusWatcherTest.java | 4 +-
.../impl/precondition/StagingPreconditionTest.java | 4 +-
.../impl/publisher/DiscoveryServiceTest.java | 4 +-
.../impl/publisher/DistPublisherJMXTest.java | 3 +-
.../impl/publisher/DistributionPublisherTest.java | 4 +-
.../publisher/PackageDistributedNotifierTest.java | 2 +-
.../journal/impl/publisher/PackageRepoTest.java | 4 +-
.../journal/impl/queue/QueueItemFactoryTest.java | 2 +-
.../journal/impl/queue/impl/EntryUtilTest.java | 1 +
.../impl/queue/impl/OffsetQueueImplJMXTest.java | 2 +-
.../journal/impl/queue/impl/PubQueueCacheTest.java | 4 +-
.../impl/queue/impl/PubQueueProviderTest.java | 2 +-
.../journal/impl/queue/impl/PubQueueTest.java | 1 +
.../journal/impl/queue/impl/RangePollerTest.java | 2 +-
.../journal/impl/subscriber/AnnouncerTest.java | 2 +
.../journal/impl/subscriber/CommandPollerTest.java | 4 +-
.../journal/impl/subscriber/SubscriberTest.java | 49 ++++----
.../subscriber/BookKeeperTest.java | 7 +-
.../subscriber/ContentPackageExtractorTest.java | 4 +-
.../subscriber/LocalStoreTest.java | 3 +-
.../subscriber}/PackageRetriesTest.java | 4 +-
.../impl => service/subscriber}/SubQueueTest.java | 5 +-
.../subscriber/SubscriberIdleTest.java | 3 +-
.../shared/DistributionMetricsServiceTest.java | 14 +--
.../{impl => }/shared/ExponentialBackoffTest.java | 3 +-
.../shared/JournalAvailableCheckerTest.java | 11 +-
.../journal/{impl => }/shared/LimitPollerTest.java | 4 +-
.../{impl => }/shared/PackageBrowserTest.java | 3 +-
.../{impl => }/shared/PackageViewerPluginTest.java | 5 +-
.../shared/SimpleDistributionResponseTest.java | 3 +-
.../journal/{impl => }/shared/TestMessageInfo.java | 2 +-
70 files changed, 253 insertions(+), 392 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
index 0408e0d..506cf55 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
@@ -29,10 +29,10 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.journal.shared.Topics;
public class PackageStatusWatcher implements Closeable {
private final Closeable poller;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
index 2272888..bbad307 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
@@ -25,8 +25,8 @@ import java.util.concurrent.TimeoutException;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
index af052a3..c201a42 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
@@ -28,9 +28,9 @@ import java.util.Hashtable;
import javax.annotation.ParametersAreNonnullByDefault;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.commons.io.IOUtils;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index 55787dc..c1c75d0 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -20,12 +20,12 @@ package org.apache.sling.distribution.journal.impl.publisher;
import static java.util.stream.StreamSupport.stream;
-import static org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.timed;
import static java.util.Objects.requireNonNull;
import static org.apache.sling.distribution.DistributionRequestState.ACCEPTED;
import static org.apache.sling.distribution.DistributionRequestType.ADD;
import static org.apache.sling.distribution.DistributionRequestType.DELETE;
import static org.apache.sling.distribution.DistributionRequestType.TEST;
+import static org.apache.sling.distribution.journal.shared.DistributionMetricsService.timed;
import java.util.Arrays;
import java.util.Collections;
@@ -47,11 +47,6 @@ import javax.management.NotCompliantMBeanException;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
-import org.apache.sling.distribution.journal.impl.shared.AgentState;
-import org.apache.sling.distribution.journal.impl.shared.DefaultDistributionLog;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.commons.lang3.StringUtils;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.distribution.DistributionRequest;
@@ -73,9 +68,13 @@ import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.metatype.annotations.Designate;
-
-import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.shared.AgentState;
+import org.apache.sling.distribution.journal.shared.DefaultDistributionLog;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.JMXRegistration;
+import org.apache.sling.distribution.journal.shared.SimpleDistributionResponse;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.JournalAvailable;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
index 591a0cf..e3dace3 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
@@ -29,8 +29,8 @@ import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
import org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.PackageDistributedMessage;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.JsonMessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
index aee0c52..81a4fba 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
@@ -25,8 +25,6 @@ import javax.jcr.Node;
import javax.jcr.Property;
import javax.jcr.nodetype.NodeType;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.jackrabbit.api.ReferenceBinary;
import org.apache.jackrabbit.commons.JcrUtils;
import org.apache.sling.api.resource.LoginException;
@@ -47,6 +45,8 @@ import org.slf4j.LoggerFactory;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.Topics;
import static java.util.Collections.singletonMap;
import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
index aaa7b1c..db27714 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
@@ -26,6 +26,8 @@ import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.shared.EntryUtil;
+import org.apache.sling.distribution.journal.shared.QueueEntryFactory;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.DistributionQueueState;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
index 9bfd4fd..197d0b1 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
@@ -40,6 +40,8 @@ import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.shared.EntryUtil;
+import org.apache.sling.distribution.journal.shared.QueueEntryFactory;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.DistributionQueueItemState;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index 7eb3451..6621979 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -40,8 +40,6 @@ import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.osgi.service.event.Event;
@@ -52,6 +50,8 @@ import org.slf4j.LoggerFactory;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.JMXRegistration;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessageSender;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
index 46a2d00..eac1f1e 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
@@ -20,9 +20,9 @@ package org.apache.sling.distribution.journal.impl.queue.impl;
import javax.annotation.ParametersAreNonnullByDefault;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.JournalAvailable;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
index 3b7feab..550c055 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
@@ -29,11 +29,11 @@ import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
index 7ecd03b..beeb62d 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
@@ -32,6 +32,7 @@ import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
+import org.apache.sling.distribution.journal.service.subscriber.BookKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
index f9d047b..3a1df97 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
@@ -27,8 +27,8 @@ import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 74fc5e2..b044704 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -67,12 +67,15 @@ import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.impl.precondition.Precondition;
import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
-import org.apache.sling.distribution.journal.impl.queue.impl.SubQueue;
-import org.apache.sling.distribution.journal.impl.shared.AgentState;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.service.subscriber.BookKeeper;
+import org.apache.sling.distribution.journal.service.subscriber.ContentPackageExtractor;
+import org.apache.sling.distribution.journal.service.subscriber.PackageHandler;
+import org.apache.sling.distribution.journal.service.subscriber.SubscriberIdle;
+import org.apache.sling.distribution.journal.service.subscriber.SubscriberMetrics;
+import org.apache.sling.distribution.journal.shared.AgentState;
+import org.apache.sling.distribution.journal.shared.SimpleDistributionResponse;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.log.spi.DistributionLog;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.apache.sling.distribution.queue.DistributionQueueItem;
@@ -133,7 +136,7 @@ public class DistributionSubscriber implements DistributionAgent {
private Precondition precondition;
@Reference
- private DistributionMetricsService distributionMetricsService;
+ private SubscriberMetrics subcriberMetrics;
@Reference
private Packaging packaging;
@@ -188,7 +191,7 @@ public class DistributionSubscriber implements DistributionAgent {
ContentPackageExtractor extractor = new ContentPackageExtractor(packaging, config.packageHandling());
PackageHandler packageHandler = new PackageHandler(packageBuilder, extractor);
- bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packageHandler, eventAdmin,
+ bookKeeper = new BookKeeper(resolverFactory, subcriberMetrics, packageHandler, eventAdmin,
sender(topics.getStatusTopic()), subAgentName, subSlingId, editable, maxRetries);
long startOffset = bookKeeper.loadOffset() + 1;
@@ -339,7 +342,7 @@ public class DistributionSubscriber implements DistributionAgent {
try {
while (running) {
if (queueItemsBuffer.offer(queueItem, 1000, TimeUnit.MILLISECONDS)) {
- distributionMetricsService.getItemsBufferSize().increment();
+ subcriberMetrics.getItemsBufferSize().increment();
return;
}
}
@@ -368,7 +371,7 @@ public class DistributionSubscriber implements DistributionAgent {
bookKeeper.sendStoredStatus();
DistributionQueueItem item = blockingPeekQueueItem();
- try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) {
+ try (Timer.Context context = subcriberMetrics.getProcessQueueItemDuration().time()) {
processQueueItem(item);
} finally {
subscriberIdle.idle();
@@ -412,7 +415,7 @@ public class DistributionSubscriber implements DistributionAgent {
bookKeeper.importPackage(pkgMsg, offset, createdTime);
}
queueItemsBuffer.remove();
- distributionMetricsService.getItemsBufferSize().decrement();
+ subcriberMetrics.getItemsBufferSize().decrement();
}
private boolean shouldSkip(long offset) throws InterruptedException, TimeoutException {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubQueue.java
similarity index 95%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubQueue.java
index 5d34e6d..2d7a3ca 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubQueue.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.impl.subscriber;
import java.util.Collections;
import java.util.List;
@@ -29,6 +29,9 @@ import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
+import org.apache.sling.distribution.journal.service.subscriber.PackageRetries;
+import org.apache.sling.distribution.journal.shared.EntryUtil;
+import org.apache.sling.distribution.journal.shared.QueueEntryFactory;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.DistributionQueueItemState;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
index 937c30a..625a62b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
@@ -18,6 +18,7 @@
*/
package org.apache.sling.distribution.journal.impl.subscriber;
+import org.apache.sling.distribution.journal.service.subscriber.PackageHandling;
import org.osgi.service.metatype.annotations.AttributeDefinition;
import org.osgi.service.metatype.annotations.ObjectClassDefinition;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeper.java
similarity index 90%
rename from src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
rename to src/main/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeper.java
index 53a7245..224279c 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeper.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
import static java.lang.String.format;
import static java.lang.System.currentTimeMillis;
@@ -42,13 +42,11 @@ import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.commons.metrics.Timer;
import org.apache.sling.distribution.common.DistributionException;
-import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
-import org.apache.sling.distribution.journal.impl.queue.impl.PackageRetries;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.journal.service.subscriber.SubscriberMetrics.GaugeService;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
@@ -85,7 +83,7 @@ public class BookKeeper implements Closeable {
private final Logger log = LoggerFactory.getLogger(this.getClass());
private final ResourceResolverFactory resolverFactory;
- private final DistributionMetricsService distributionMetricsService;
+ private final SubscriberMetrics subsciberMetrics;
private final PackageHandler packageHandler;
private final EventAdmin eventAdmin;
private final Consumer<PackageStatusMessage> sender;
@@ -102,7 +100,7 @@ public class BookKeeper implements Closeable {
private int skippedCounter = 0;
public BookKeeper(ResourceResolverFactory resolverFactory,
- DistributionMetricsService distributionMetricsService,
+ SubscriberMetrics subcriberMetrics,
PackageHandler packageHandler,
EventAdmin eventAdmin,
Consumer<PackageStatusMessage> sender,
@@ -113,9 +111,9 @@ public class BookKeeper implements Closeable {
this.packageHandler = packageHandler;
this.eventAdmin = eventAdmin;
String nameRetries = DistributionMetricsService.SUB_COMPONENT + ".current_retries;sub_name=" + subAgentName;
- this.retriesGauge = distributionMetricsService.createGauge(nameRetries, "Retries of current package", packageRetries::getSum);
+ this.retriesGauge = subcriberMetrics.createGauge(nameRetries, "Retries of current package", packageRetries::getSum);
this.resolverFactory = resolverFactory;
- this.distributionMetricsService = distributionMetricsService;
+ this.subsciberMetrics = subcriberMetrics;
this.sender = sender;
this.subAgentName = subAgentName;
this.subSlingId = subSlingId;
@@ -148,7 +146,7 @@ public class BookKeeper implements Closeable {
log.info("Importing distribution package {} of type {} at offset {}",
pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
addPackageMDC(pkgMsg);
- try (Timer.Context context = distributionMetricsService.getImportedPackageDuration().time();
+ try (Timer.Context context = subsciberMetrics.getImportedPackageDuration().time();
ResourceResolver importerResolver = getServiceResolver(SUBSERVICE_IMPORTER)) {
packageHandler.apply(importerResolver, pkgMsg);
if (editable) {
@@ -156,10 +154,10 @@ public class BookKeeper implements Closeable {
}
storeOffset(importerResolver, offset);
importerResolver.commit();
- distributionMetricsService.getImportedPackageSize().update(pkgMsg.getPkgLength());
- distributionMetricsService.getPackageDistributedDuration().update((currentTimeMillis() - createdTime), TimeUnit.MILLISECONDS);
+ subsciberMetrics.getImportedPackageSize().update(pkgMsg.getPkgLength());
+ subsciberMetrics.getPackageDistributedDuration().update((currentTimeMillis() - createdTime), TimeUnit.MILLISECONDS);
packageRetries.clear(pkgMsg.getPubAgentName());
- Event event = DistributionEvent.eventImporterImported(pkgMsg, subAgentName);
+ Event event = ImportedEventFactory.create(pkgMsg, subAgentName);
eventAdmin.postEvent(event);
} catch (LoginException | IOException | RuntimeException e) {
failure(pkgMsg, offset, e);
@@ -195,7 +193,7 @@ public class BookKeeper implements Closeable {
* @throws DistributionException if the package should be retried
*/
private void failure(PackageMessage pkgMsg, long offset, Exception e) throws DistributionException {
- distributionMetricsService.getFailedPackageImports().mark();
+ subsciberMetrics.getFailedPackageImports().mark();
String pubAgentName = pkgMsg.getPubAgentName();
int retries = packageRetries.get(pubAgentName);
@@ -213,7 +211,7 @@ public class BookKeeper implements Closeable {
public void removePackage(PackageMessage pkgMsg, long offset) throws LoginException, PersistenceException {
log.info("Removing distribution package {} of type {} at offset {}",
pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
- Timer.Context context = distributionMetricsService.getRemovedPackageDuration().time();
+ Timer.Context context = subsciberMetrics.getRemovedPackageDuration().time();
try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
if (editable) {
storeStatus(resolver, new PackageStatus(REMOVED, offset, pkgMsg.getPubAgentName()));
@@ -250,7 +248,7 @@ public class BookKeeper implements Closeable {
* @throws InterruptedException
*/
public void sendStoredStatus() throws InterruptedException {
- try (Timer.Context context = distributionMetricsService.getSendStoredStatusDuration().time()) {
+ try (Timer.Context context = subsciberMetrics.getSendStoredStatusDuration().time()) {
PackageStatus status = new PackageStatus(statusStore.load());
boolean sent = status.sent;
int retry = 0;
@@ -315,7 +313,7 @@ public class BookKeeper implements Closeable {
private void removeFailedPackage(PackageMessage pkgMsg, long offset) throws DistributionException {
log.info("Removing failed distribution package {} of type {} at offset {}",
pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
- Timer.Context context = distributionMetricsService.getRemovedFailedPackageDuration().time();
+ Timer.Context context = subsciberMetrics.getRemovedFailedPackageDuration().time();
try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
storeStatus(resolver, new PackageStatus(REMOVED_FAILED, offset, pkgMsg.getPubAgentName()));
storeOffset(resolver, offset);
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractor.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractor.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractor.java
rename to src/main/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractor.java
index 998f929..f17f7b5 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractor.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractor.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
import static java.util.Objects.requireNonNull;
diff --git a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/ImportedEventFactory.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/ImportedEventFactory.java
new file mode 100644
index 0000000..8ad6f86
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/ImportedEventFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.service.subscriber;
+
+import static org.apache.sling.distribution.event.DistributionEventProperties.DISTRIBUTION_COMPONENT_KIND;
+import static org.apache.sling.distribution.event.DistributionEventProperties.DISTRIBUTION_COMPONENT_NAME;
+import static org.apache.sling.distribution.event.DistributionEventProperties.DISTRIBUTION_PATHS;
+import static org.apache.sling.distribution.event.DistributionEventProperties.DISTRIBUTION_TYPE;
+import static org.apache.sling.distribution.event.DistributionEventTopics.IMPORTER_PACKAGE_IMPORTED;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.sling.distribution.journal.messages.Messages;
+import org.osgi.service.event.Event;
+
+@ParametersAreNonnullByDefault
+public class ImportedEventFactory {
+
+ public static final String PACKAGE_ID = "distribution.package.id";
+ private static final String KIND_IMPORTER = "importer";
+
+ private ImportedEventFactory() {
+ }
+
+ public static Event create(Messages.PackageMessage pkgMsg, String agentName) {
+ String[] pathsList = pkgMsg.getPathsList().toArray(new String[0]);
+ Map<String, Object> props = new HashMap<>();
+ props.put(DISTRIBUTION_COMPONENT_KIND, KIND_IMPORTER);
+ props.put(DISTRIBUTION_COMPONENT_NAME, agentName);
+ props.put(DISTRIBUTION_TYPE, pkgMsg.getReqType().name());
+ props.put(DISTRIBUTION_PATHS, pathsList);
+ props.put(PACKAGE_ID, pkgMsg.getPkgId());
+ return new Event(IMPORTER_PACKAGE_IMPORTED, props);
+ }
+
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/LocalStore.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java
rename to src/main/java/org/apache/sling/distribution/journal/service/subscriber/LocalStore.java
index c619d79..0a3c541 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/LocalStore.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.ModifiableValueMap;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandler.java
similarity index 71%
rename from src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
rename to src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandler.java
index a7148c4..aadc77a 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandler.java
@@ -16,18 +16,25 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
import static java.lang.String.format;
+import java.io.ByteArrayInputStream;
import java.io.InputStream;
+import javax.annotation.Nonnull;
+import javax.jcr.Binary;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.ValueFactory;
+
import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.commons.jackrabbit.SimpleReferenceBinary;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.distribution.common.DistributionException;
-import org.apache.sling.distribution.journal.impl.shared.PackageBrowser;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.slf4j.Logger;
@@ -66,13 +73,32 @@ public class PackageHandler {
LOG.info("Importing paths {}",pkgMsg.getPathsList());
InputStream pkgStream = null;
try {
- pkgStream = PackageBrowser.pkgStream(resolver, pkgMsg);
+ pkgStream = pkgStream(resolver, pkgMsg);
packageBuilder.installPackage(resolver, pkgStream);
extractor.handle(resolver, pkgMsg.getPathsList());
} finally {
IOUtils.closeQuietly(pkgStream);
}
-
+ }
+
+ @Nonnull
+ public static InputStream pkgStream(ResourceResolver resolver, PackageMessage pkgMsg) throws DistributionException {
+ if (pkgMsg.hasPkgBinary()) {
+ return new ByteArrayInputStream(pkgMsg.getPkgBinary().toByteArray());
+ } else {
+ String pkgBinRef = pkgMsg.getPkgBinaryRef();
+ try {
+ Session session = resolver.adaptTo(Session.class);
+ if (session == null) {
+ throw new DistributionException("Unable to get Oak session");
+ }
+ ValueFactory factory = session.getValueFactory();
+ Binary binary = factory.createValue(new SimpleReferenceBinary(pkgBinRef)).getBinary();
+ return binary.getStream();
+ } catch (RepositoryException e) {
+ throw new DistributionException(e.getMessage(), e);
+ }
+ }
}
private void installDeletePackage(ResourceResolver resolver, PackageMessage pkgMsg)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandling.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandling.java
similarity index 92%
rename from src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandling.java
rename to src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandling.java
index 55dc55e..a1130fc 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandling.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandling.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
public enum PackageHandling {
Off, Extract, Install
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetries.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetries.java
similarity index 96%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetries.java
rename to src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetries.java
index 23f9463..3208572 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetries.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetries.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.service.subscriber;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdle.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
rename to src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdle.java
index a05b80c..d51da87 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdle.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
import java.io.Closeable;
import java.util.Hashtable;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberMetrics.java
similarity index 63%
copy from src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
copy to src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberMetrics.java
index f18c04c..8dd6cab 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberMetrics.java
@@ -16,14 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.service.subscriber;
import static java.lang.String.format;
import java.io.Closeable;
import java.util.Dictionary;
import java.util.Hashtable;
-import java.util.concurrent.Callable;
import java.util.function.Supplier;
import org.apache.sling.commons.metrics.Counter;
@@ -41,8 +40,8 @@ import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@Component(service = DistributionMetricsService.class)
-public class DistributionMetricsService {
+@Component(service = SubscriberMetrics.class)
+public class SubscriberMetrics {
public static final String BASE_COMPONENT = "distribution.journal";
@@ -55,18 +54,8 @@ public class DistributionMetricsService {
@Reference
private MetricsService metricsService;
- private Counter cleanupPackageRemovedCount;
-
- private Timer cleanupPackageDuration;
-
private Histogram importedPackageSize;
- private Histogram exportedPackageSize;
-
- private Meter acceptedRequests;
-
- private Meter droppedRequests;
-
private Counter itemsBufferSize;
private Timer removedPackageDuration;
@@ -83,26 +72,11 @@ public class DistributionMetricsService {
private Timer packageDistributedDuration;
- private Timer buildPackageDuration;
-
- private Timer enqueuePackageDuration;
-
- private Counter queueCacheFetchCount;
-
private BundleContext context;
@Activate
public void activate(BundleContext context) {
this.context = context;
- cleanupPackageRemovedCount = getCounter(getMetricName(PUB_COMPONENT, "cleanup_package_removed_count"));
- cleanupPackageDuration = getTimer(getMetricName(PUB_COMPONENT, "cleanup_package_duration"));
- exportedPackageSize = getHistogram(getMetricName(PUB_COMPONENT, "exported_package_size"));
- acceptedRequests = getMeter(getMetricName(PUB_COMPONENT, "accepted_requests"));
- droppedRequests = getMeter(getMetricName(PUB_COMPONENT, "dropped_requests"));
- buildPackageDuration = getTimer(getMetricName(PUB_COMPONENT, "build_package_duration"));
- enqueuePackageDuration = getTimer(getMetricName(PUB_COMPONENT, "enqueue_package_duration"));
- queueCacheFetchCount = getCounter(getMetricName(PUB_COMPONENT, "queue_cache_fetch_count"));
-
importedPackageSize = getHistogram(getMetricName(SUB_COMPONENT, "imported_package_size"));
itemsBufferSize = getCounter(getMetricName(SUB_COMPONENT, "items_buffer_size"));
importedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, "imported_package_duration"));
@@ -115,56 +89,6 @@ public class DistributionMetricsService {
}
/**
- * Runs provided code updating provided metric
- * with its execution time.
- * The method guarantees that the metric is updated
- * even if the code throws an exception
- * @param metric metric to update
- * @param code code to clock
- * @throws Exception actually it doesn't
- */
- public static void timed(Timer metric, Runnable code) throws Exception {
- try (Timer.Context ignored = metric.time()) {
- code.run();
- }
- }
-
- /**
- * Runs provided code updating provided metric
- * with its execution time.
- * The method guarantees that the metric is updated
- * even if the code throws an exception
- * @param metric metric to update
- * @param code code to clock
- * @return a value returned but <code>code.call()</code> invocation
- * @throws Exception if underlying code throws
- */
- public static <T> T timed(Timer metric, Callable<T> code) throws Exception {
- try (Timer.Context ignored = metric.time()) {
- return code.call();
- }
- }
-
- /**
- * Counter of package removed during the Package Cleanup Task.
- * The count is the sum of all packages removed since the service started.
- *
- * @return a Sling Metrics timer
- */
- public Counter getCleanupPackageRemovedCount() {
- return cleanupPackageRemovedCount;
- }
-
- /**
- * Timer of the Package Cleanup Task execution duration.
- *
- * @return a Sling Metrics timer
- */
- public Timer getCleanupPackageDuration() {
- return cleanupPackageDuration;
- }
-
- /**
* Histogram of the imported content package size in Byte.
*
* @return a Sling Metrics histogram
@@ -174,33 +98,6 @@ public class DistributionMetricsService {
}
/**
- * Histogram of the exported content package size in Bytes.
- *
- * @return a Sling Metrics histogram
- */
- public Histogram getExportedPackageSize() {
- return exportedPackageSize;
- }
-
- /**
- * Meter of requests returning an {@code DistributionRequestState.ACCEPTED} state.
- *
- * @return a Sling Metrics meter
- */
- public Meter getAcceptedRequests() {
- return acceptedRequests;
- }
-
- /**
- * Meter of requests returning an {@code DistributionRequestState.DROPPED} state.
- *
- * @return a Sling Metrics meter
- */
- public Meter getDroppedRequests() {
- return droppedRequests;
- }
-
- /**
* Counter of the package buffer size on the subscriber.
*
* @return a Sling Metrics counter
@@ -273,33 +170,6 @@ public class DistributionMetricsService {
return packageDistributedDuration;
}
- /**
- * Timer capturing the duration in ms of building a content package
- *
- * @return a Sling Metric timer
- */
- public Timer getBuildPackageDuration() {
- return buildPackageDuration;
- }
-
- /**
- * Timer capturing the duration in ms of adding a package to the queue
- *
- * @return a Sling Metric timer
- */
- public Timer getEnqueuePackageDuration() {
- return enqueuePackageDuration;
- }
-
- /**
- * Counter of fetch operations to feed the queue cache.
- *
- * @return a Sling Metric counter
- */
- public Counter getQueueCacheFetchCount() {
- return queueCacheFetchCount;
- }
-
public <T> GaugeService<T> createGauge(String name, String description, Supplier<T> supplier) {
return new GaugeService<>(name, description, supplier);
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/AgentState.java b/src/main/java/org/apache/sling/distribution/journal/shared/AgentState.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/AgentState.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/AgentState.java
index 858e8dc..51d5466 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/AgentState.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/AgentState.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DefaultDistributionLog.java b/src/main/java/org/apache/sling/distribution/journal/shared/DefaultDistributionLog.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/DefaultDistributionLog.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/DefaultDistributionLog.java
index bb49514..80c3747 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DefaultDistributionLog.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/DefaultDistributionLog.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java b/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
similarity index 69%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
index f18c04c..1f1b279 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import static java.lang.String.format;
@@ -59,30 +59,12 @@ public class DistributionMetricsService {
private Timer cleanupPackageDuration;
- private Histogram importedPackageSize;
-
private Histogram exportedPackageSize;
private Meter acceptedRequests;
private Meter droppedRequests;
- private Counter itemsBufferSize;
-
- private Timer removedPackageDuration;
-
- private Timer removedFailedPackageDuration;
-
- private Timer importedPackageDuration;
-
- private Meter failedPackageImports;
-
- private Timer sendStoredStatusDuration;
-
- private Timer processQueueItemDuration;
-
- private Timer packageDistributedDuration;
-
private Timer buildPackageDuration;
private Timer enqueuePackageDuration;
@@ -102,16 +84,6 @@ public class DistributionMetricsService {
buildPackageDuration = getTimer(getMetricName(PUB_COMPONENT, "build_package_duration"));
enqueuePackageDuration = getTimer(getMetricName(PUB_COMPONENT, "enqueue_package_duration"));
queueCacheFetchCount = getCounter(getMetricName(PUB_COMPONENT, "queue_cache_fetch_count"));
-
- importedPackageSize = getHistogram(getMetricName(SUB_COMPONENT, "imported_package_size"));
- itemsBufferSize = getCounter(getMetricName(SUB_COMPONENT, "items_buffer_size"));
- importedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, "imported_package_duration"));
- removedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, "removed_package_duration"));
- removedFailedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, "removed_failed_package_duration"));
- failedPackageImports = getMeter(getMetricName(SUB_COMPONENT, "failed_package_imports"));
- sendStoredStatusDuration = getTimer(getMetricName(SUB_COMPONENT, "send_stored_status_duration"));
- processQueueItemDuration = getTimer(getMetricName(SUB_COMPONENT, "process_queue_item_duration"));
- packageDistributedDuration = getTimer(getMetricName(SUB_COMPONENT, "request_distributed_duration"));
}
/**
@@ -165,15 +137,6 @@ public class DistributionMetricsService {
}
/**
- * Histogram of the imported content package size in Byte.
- *
- * @return a Sling Metrics histogram
- */
- public Histogram getImportedPackageSize() {
- return importedPackageSize;
- }
-
- /**
* Histogram of the exported content package size in Bytes.
*
* @return a Sling Metrics histogram
@@ -201,79 +164,6 @@ public class DistributionMetricsService {
}
/**
- * Counter of the package buffer size on the subscriber.
- *
- * @return a Sling Metrics counter
- */
- public Counter getItemsBufferSize() {
- return itemsBufferSize;
- }
-
- /**
- * Timer capturing the duration in ms of successful packages import operations.
- *
- * @return a Sling Metrics timer
- */
- public Timer getImportedPackageDuration() {
- return importedPackageDuration;
- }
-
- /**
- * Timer capturing the duration in ms of packages successfully removed from an editable subscriber.
- *
- * @return a Sling Metrics timer
- */
- public Timer getRemovedPackageDuration() {
- return removedPackageDuration;
- }
-
- /**
- * Timer capturing the duration in ms of packages successfully removed automatically from a subscriber supporting error queue.
- *
- * @return a Sling Metrics timer
- */
- public Timer getRemovedFailedPackageDuration() {
- return removedFailedPackageDuration;
- }
-
- /**
- * Meter of failures to import packages.
- *
- * @return a Sling Metrics meter
- */
- public Meter getFailedPackageImports() {
- return failedPackageImports;
- }
-
- /**
- * Timer capturing the duration in ms of sending a stored package status.
- *
- * @return a Sling Metric timer
- */
- public Timer getSendStoredStatusDuration() {
- return sendStoredStatusDuration;
- }
-
- /**
- * Timer capturing the duration in ms of processing a queue item.
- *
- * @return a Sling Metric timer
- */
- public Timer getProcessQueueItemDuration() {
- return processQueueItemDuration;
- }
-
- /**
- * Timer capturing the duration in ms of distributing a distribution package.
- * The timer starts when the package is enqueued and stops when the package is successfully imported.
- *
- * @return a Sling Metric timer
- */
- public Timer getPackageDistributedDuration() {
- return packageDistributedDuration;
- }
-
- /**
* Timer capturing the duration in ms of building a content package
*
* @return a Sling Metric timer
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java b/src/main/java/org/apache/sling/distribution/journal/shared/EntryUtil.java
similarity index 96%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/EntryUtil.java
index 529fc52..ed2d461 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/EntryUtil.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.shared;
import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
import org.apache.sling.distribution.queue.DistributionQueueItem;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java b/src/main/java/org/apache/sling/distribution/journal/shared/ExponentialBackOff.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/ExponentialBackOff.java
index 28f5aa8..48727a7 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/ExponentialBackOff.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JMXRegistration.java b/src/main/java/org/apache/sling/distribution/journal/shared/JMXRegistration.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/JMXRegistration.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/JMXRegistration.java
index e4f36c9..a1e3924 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JMXRegistration.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/JMXRegistration.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import static java.lang.management.ManagementFactory.getPlatformMBeanServer;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java b/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableChecker.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableChecker.java
index 3582b36..6cb5170 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableChecker.java
@@ -16,14 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import static java.util.Objects.requireNonNull;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.ExceptionEventSender;
import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService.GaugeService;
import org.osgi.framework.BundleContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableServiceMarker.java b/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableServiceMarker.java
similarity index 96%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableServiceMarker.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableServiceMarker.java
index 287f3fa..9478372 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableServiceMarker.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableServiceMarker.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import java.util.Objects;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/LimitPoller.java b/src/main/java/org/apache/sling/distribution/journal/shared/LimitPoller.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/LimitPoller.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/LimitPoller.java
index 2bb30b5..18cf976 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/LimitPoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/LimitPoller.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import static org.apache.sling.distribution.journal.HandlerAdapter.create;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java b/src/main/java/org/apache/sling/distribution/journal/shared/PackageBrowser.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/PackageBrowser.java
index b9d1d2d..72eb177 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/PackageBrowser.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import static java.util.Collections.singletonMap;
import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java b/src/main/java/org/apache/sling/distribution/journal/shared/PackageViewerPlugin.java
similarity index 99%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/PackageViewerPlugin.java
index 30f8836..8ac9ba5 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/PackageViewerPlugin.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import java.io.IOException;
import java.io.PrintWriter;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java b/src/main/java/org/apache/sling/distribution/journal/shared/QueueEntryFactory.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/QueueEntryFactory.java
index 47a9f25..58a0717 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/QueueEntryFactory.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.shared;
import static org.apache.sling.distribution.queue.DistributionQueueItemState.ERROR;
import static org.apache.sling.distribution.queue.DistributionQueueItemState.QUEUED;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponse.java b/src/main/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponse.java
similarity index 96%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponse.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponse.java
index e49700e..eb0510c 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponse.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponse.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/Topics.java b/src/main/java/org/apache/sling/distribution/journal/shared/Topics.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/Topics.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/Topics.java
index 1f4d44a..c9ce5e2 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/Topics.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/Topics.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
index 571d98a..1484a9e 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
@@ -19,10 +19,10 @@
package org.apache.sling.distribution.journal.impl.precondition;
import org.apache.sling.distribution.journal.impl.precondition.PackageStatusWatcher;
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.MessagingProvider;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
index eab0479..b078605 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
@@ -36,9 +36,9 @@ import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.impl.precondition.StagingPrecondition;
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.junit.Before;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
index 5e7b76c..2b82486 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
@@ -30,8 +30,6 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.UUID;
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -40,6 +38,8 @@ import org.mockito.Mockito;
import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.MessageInfo;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java
index 2a41a2f..f52026c 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java
@@ -41,6 +41,7 @@ import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
+import org.apache.sling.distribution.journal.shared.JMXRegistration;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.DistributionQueueItemState;
@@ -53,8 +54,6 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
-import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
-
@SuppressWarnings("rawtypes")
public class DistPublisherJMXTest {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index 7ab2bdf..434c561 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -38,9 +38,9 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import com.google.common.collect.ImmutableMap;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
index acc91b0..ab5d17e 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
@@ -21,9 +21,9 @@ package org.apache.sling.distribution.journal.impl.publisher;
import java.util.Collections;
import java.util.HashSet;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
import org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.queue.DistributionQueueItem;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
index a8fb467..9b41463 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
@@ -30,8 +30,6 @@ import java.util.List;
import java.util.UUID;
import java.util.function.Function;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
@@ -57,6 +55,8 @@ import org.mockito.Spy;
import org.osgi.framework.BundleContext;
import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java
index 6aeb1ba..e3b5582 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java
@@ -31,13 +31,13 @@ import static org.hamcrest.Matchers.hasItemInArray;
import static org.junit.Assert.assertThat;
import org.apache.sling.distribution.DistributionRequestType;
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.hamcrest.Matcher;
import org.junit.Test;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
import org.apache.sling.distribution.journal.MessageInfo;
public class QueueItemFactoryTest {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtilTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtilTest.java
index 11a52f9..5aededc 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtilTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtilTest.java
@@ -20,6 +20,7 @@ package org.apache.sling.distribution.journal.impl.queue.impl;
import java.util.HashMap;
+import org.apache.sling.distribution.journal.shared.EntryUtil;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.junit.Test;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplJMXTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplJMXTest.java
index 18cc957..e7f9639 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplJMXTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplJMXTest.java
@@ -34,7 +34,7 @@ import javax.management.ObjectName;
import javax.management.ReflectionException;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
+import org.apache.sling.distribution.journal.shared.JMXRegistration;
import org.junit.Test;
public class OffsetQueueImplJMXTest {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
index 1113319..5fca8a3 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
@@ -57,10 +57,10 @@ import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
index 61c1389..166ad1a 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
@@ -38,7 +38,6 @@ import javax.management.ObjectInstance;
import javax.management.ObjectName;
import javax.management.ReflectionException;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.MessageSender;
import com.google.protobuf.GeneratedMessage;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
@@ -58,6 +57,7 @@ import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.MessageInfo;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java
index 20dccea..211b9fc 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java
@@ -37,6 +37,7 @@ import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.shared.EntryUtil;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.DistributionQueueType;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
index 778bbaa..3cef383 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
@@ -28,7 +28,6 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.List;
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -41,6 +40,7 @@ import org.mockito.MockitoAnnotations;
import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
index 05f8a88..e669a82 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
@@ -27,7 +27,9 @@ import static org.mockito.Mockito.when;
import java.util.Collections;
import java.util.function.Consumer;
+import org.apache.sling.distribution.journal.impl.subscriber.Announcer;
import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.service.subscriber.BookKeeper;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
index d6c774e..0de3d2e 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
@@ -33,10 +33,10 @@ import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages.ClearCommand;
import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.junit.Before;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index 1d47520..e3dd57b 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -48,11 +48,6 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.sling.distribution.journal.impl.precondition.Precondition;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
-import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.ResourceResolver;
@@ -70,6 +65,20 @@ import org.apache.sling.distribution.SimpleDistributionRequest;
import org.apache.sling.distribution.agent.DistributionAgentState;
import org.apache.sling.distribution.agent.spi.DistributionAgent;
import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.MessageHandler;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessageSender;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.precondition.Precondition;
+import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.service.subscriber.SubscriberMetrics;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.apache.sling.distribution.packaging.DistributionPackageInfo;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
@@ -97,16 +106,6 @@ import org.osgi.framework.ServiceRegistration;
import org.osgi.service.event.EventAdmin;
import org.osgi.util.converter.Converters;
-import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
-import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
-
-import org.apache.sling.distribution.journal.HandlerAdapter;
-import org.apache.sling.distribution.journal.MessageHandler;
-import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
@@ -173,7 +172,7 @@ public class SubscriberTest {
private MessageSender<PackageStatusMessage> statusSender;
@Mock
- private DistributionMetricsService distributionMetricsService;
+ private SubscriberMetrics subscriberMetrics;
@InjectMocks
DistributionSubscriber subscriber;
@@ -384,23 +383,23 @@ public class SubscriberTest {
Timer.Context timerContext = Mockito.mock(Timer.Context.class);
when(timer.time())
.thenReturn(timerContext);
- when(distributionMetricsService.getImportedPackageSize())
+ when(subscriberMetrics.getImportedPackageSize())
.thenReturn(histogram);
- when(distributionMetricsService.getItemsBufferSize())
+ when(subscriberMetrics.getItemsBufferSize())
.thenReturn(counter);
- when(distributionMetricsService.getFailedPackageImports())
+ when(subscriberMetrics.getFailedPackageImports())
.thenReturn(meter);
- when(distributionMetricsService.getRemovedFailedPackageDuration())
+ when(subscriberMetrics.getRemovedFailedPackageDuration())
.thenReturn(timer);
- when(distributionMetricsService.getRemovedPackageDuration())
+ when(subscriberMetrics.getRemovedPackageDuration())
.thenReturn(timer);
- when(distributionMetricsService.getImportedPackageDuration())
+ when(subscriberMetrics.getImportedPackageDuration())
.thenReturn(timer);
- when(distributionMetricsService.getSendStoredStatusDuration())
+ when(subscriberMetrics.getSendStoredStatusDuration())
.thenReturn(timer);
- when(distributionMetricsService.getProcessQueueItemDuration())
+ when(subscriberMetrics.getProcessQueueItemDuration())
.thenReturn(timer);
- when(distributionMetricsService.getPackageDistributedDuration())
+ when(subscriberMetrics.getPackageDistributedDuration())
.thenReturn(timer);
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeperTest.java
similarity index 89%
rename from src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
rename to src/test/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeperTest.java
index fd4c761..6d88be9 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeperTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
@@ -26,7 +26,6 @@ import java.util.function.Consumer;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.ResourceResolverFactory;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
import org.junit.Before;
@@ -44,7 +43,7 @@ public class BookKeeperTest {
private ResourceResolverFactory resolverFactory = new MockResourceResolverFactory();
@Mock
- private DistributionMetricsService distributionMetricsService;
+ private SubscriberMetrics subscriberMetrics;
@Mock
private PackageHandler packageHandler;
@@ -59,7 +58,7 @@ public class BookKeeperTest {
@Before
public void before() {
- bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packageHandler, eventAdmin, sender,
+ bookKeeper = new BookKeeper(resolverFactory, subscriberMetrics, packageHandler, eventAdmin, sender,
"subAgentName", "subSlingId", true, 10);
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractorTest.java b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractorTest.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractorTest.java
rename to src/test/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractorTest.java
index 0868cc8..16805a9 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractorTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractorTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
import static java.util.Collections.singletonList;
import static org.mockito.Mockito.verify;
@@ -37,6 +37,8 @@ import org.apache.jackrabbit.vault.packaging.Packaging;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.distribution.journal.service.subscriber.ContentPackageExtractor;
+import org.apache.sling.distribution.journal.service.subscriber.PackageHandling;
import org.apache.sling.testing.mock.osgi.MockOsgi;
import org.apache.sling.testing.mock.sling.MockSling;
import org.apache.sling.testing.mock.sling.ResourceResolverType;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/LocalStoreTest.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java
rename to src/test/java/org/apache/sling/distribution/journal/service/subscriber/LocalStoreTest.java
index 11b346a..1a03199 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/LocalStoreTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
import java.util.HashMap;
import java.util.Map;
@@ -29,6 +29,7 @@ import static org.junit.Assert.assertEquals;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.distribution.journal.service.subscriber.LocalStore;
import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
import org.junit.Test;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetriesTest.java b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetriesTest.java
similarity index 90%
rename from src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetriesTest.java
rename to src/test/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetriesTest.java
index 6c33412..4f2f0a4 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetriesTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetriesTest.java
@@ -16,12 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.service.subscriber;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+import org.apache.sling.distribution.journal.service.subscriber.PackageRetries;
+
public class PackageRetriesTest {
@Test
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueueTest.java b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/SubQueueTest.java
similarity index 93%
rename from src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueueTest.java
rename to src/test/java/org/apache/sling/distribution/journal/service/subscriber/SubQueueTest.java
index d2784e9..90ef7b5 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueueTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/SubQueueTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.service.subscriber;
import java.util.HashMap;
import java.util.Map;
@@ -26,6 +26,9 @@ import org.junit.Assert;
import org.junit.Test;
import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
+import org.apache.sling.distribution.journal.impl.subscriber.SubQueue;
+import org.apache.sling.distribution.journal.service.subscriber.PackageRetries;
+
import com.google.common.collect.Lists;
public class SubQueueTest {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdleTest.java
similarity index 94%
rename from src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
rename to src/test/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdleTest.java
index ef5f0d7..6c40034 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdleTest.java
@@ -16,12 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
import org.apache.felix.systemready.CheckStatus.State;
+import org.apache.sling.distribution.journal.service.subscriber.SubscriberIdle;
import org.junit.Test;
import org.mockito.Mockito;
import org.osgi.framework.BundleContext;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsServiceTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
similarity index 85%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsServiceTest.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
index 1049ef5..50d5507 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsServiceTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertNotNull;
@@ -32,7 +32,8 @@ import org.apache.sling.commons.metrics.Meter;
import org.apache.sling.commons.metrics.MetricsService;
import org.apache.sling.commons.metrics.Timer;
import org.apache.sling.commons.metrics.Timer.Context;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService.GaugeService;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -94,16 +95,7 @@ public class DistributionMetricsServiceTest {
assertNotNull(metrics.getDroppedRequests());
assertNotNull(metrics.getEnqueuePackageDuration());
assertNotNull(metrics.getExportedPackageSize());
- assertNotNull(metrics.getFailedPackageImports());
- assertNotNull(metrics.getImportedPackageDuration());
- assertNotNull(metrics.getImportedPackageSize());
- assertNotNull(metrics.getItemsBufferSize());
- assertNotNull(metrics.getPackageDistributedDuration());
- assertNotNull(metrics.getProcessQueueItemDuration());
assertNotNull(metrics.getQueueCacheFetchCount());
- assertNotNull(metrics.getRemovedFailedPackageDuration());
- assertNotNull(metrics.getRemovedPackageDuration());
- assertNotNull(metrics.getSendStoredStatusDuration());
}
@Test
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/ExponentialBackoffTest.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/ExponentialBackoffTest.java
index bd44057..8fe7ad9 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/ExponentialBackoffTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import static java.time.Duration.of;
import static java.time.temporal.ChronoUnit.MILLIS;
@@ -27,6 +27,7 @@ import java.time.temporal.ChronoUnit;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.sling.distribution.journal.shared.ExponentialBackOff;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.runners.MockitoJUnitRunner;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/JournalAvailableCheckerTest.java
similarity index 90%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/JournalAvailableCheckerTest.java
index ce7f279..0214b83 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/JournalAvailableCheckerTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
@@ -35,9 +35,12 @@ import org.apache.sling.distribution.journal.ExceptionEventSender;
import org.apache.sling.distribution.journal.JournalAvailable;
import org.apache.sling.distribution.journal.MessagingException;
import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
-import org.apache.sling.distribution.journal.impl.shared.JournalAvailableChecker.JournalCheckerConfiguration;
-import org.apache.sling.distribution.journal.impl.shared.Topics.TopicsConfiguration;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.JournalAvailableChecker;
+import org.apache.sling.distribution.journal.shared.Topics;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService.GaugeService;
+import org.apache.sling.distribution.journal.shared.JournalAvailableChecker.JournalCheckerConfiguration;
+import org.apache.sling.distribution.journal.shared.Topics.TopicsConfiguration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/LimitPollerTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/LimitPollerTest.java
similarity index 97%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/LimitPollerTest.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/LimitPollerTest.java
index fed9fef..7fd0207 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/LimitPollerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/LimitPollerTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.samePropertyValuesAs;
@@ -29,7 +29,6 @@ import java.io.IOException;
import java.time.Duration;
import java.util.List;
-import org.apache.sling.distribution.journal.impl.shared.LimitPoller;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -42,6 +41,7 @@ import org.mockito.MockitoAnnotations;
import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.shared.LimitPoller;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/PackageBrowserTest.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/PackageBrowserTest.java
index 14dcc19..ede1b69 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/PackageBrowserTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
@@ -36,6 +36,7 @@ import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.shared.PackageBrowser;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/PackageViewerPluginTest.java
similarity index 95%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/PackageViewerPluginTest.java
index aa2947a..f483c86 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/PackageViewerPluginTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import static java.util.Collections.emptyList;
import static org.hamcrest.Matchers.containsString;
@@ -43,6 +43,9 @@ import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.shared.PackageBrowser;
+import org.apache.sling.distribution.journal.shared.PackageViewerPlugin;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponseTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponseTest.java
similarity index 93%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponseTest.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponseTest.java
index 5d31bf6..7202b6b 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponseTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponseTest.java
@@ -16,13 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.sling.distribution.DistributionRequestState;
+import org.apache.sling.distribution.journal.shared.SimpleDistributionResponse;
import org.junit.Test;
public class SimpleDistributionResponseTest {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/TestMessageInfo.java b/src/test/java/org/apache/sling/distribution/journal/shared/TestMessageInfo.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/TestMessageInfo.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/TestMessageInfo.java
index a2105e3..9145dfe 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/TestMessageInfo.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/TestMessageInfo.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import org.apache.sling.distribution.journal.MessageInfo;