You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2020/03/30 16:34:11 UTC
[sling-org-apache-sling-distribution-journal] branch master
updated: SLING-9259 - Extract service subscriber code into separate package
(#25)
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new 9f2175a SLING-9259 - Extract service subscriber code into separate package (#25)
9f2175a is described below
commit 9f2175a3084c66862f7172fceae483a4a159d7b2
Author: Christian Schneider <ch...@die-schneider.net>
AuthorDate: Mon Mar 30 18:34:04 2020 +0200
SLING-9259 - Extract service subscriber code into separate package (#25)
* SLING-9259 - Extract service subscriber code into separate package
* SLING-9259 - Remove SubQueue, add factory for BookKeeper, simplify PreCondition
* SLING-9259 - Provide simpler facade for BookKeeper
---
.../impl/precondition/DefaultPrecondition.java | 4 +-
.../impl/precondition/PackageStatusWatcher.java | 2 +-
.../journal/impl/precondition/Precondition.java | 3 +-
.../impl/precondition/StagingPrecondition.java | 32 +--
.../journal/impl/publisher/DiscoveryService.java | 2 +-
.../impl/publisher/DistributionPublisher.java | 15 +-
.../impl/publisher/PackageDistributedNotifier.java | 2 +-
.../journal/impl/publisher/PackageRepo.java | 4 +-
.../journal/impl/queue/impl/PubErrQueue.java | 2 +
.../journal/impl/queue/impl/PubQueue.java | 2 +
.../journal/impl/queue/impl/PubQueueCache.java | 8 +-
.../impl/queue/impl/PubQueueCacheService.java | 4 +-
.../impl/queue/impl/PubQueueProviderImpl.java | 2 +-
.../journal/impl/queue/impl/SubQueue.java | 164 -------------
.../journal/impl/subscriber/Announcer.java | 1 +
.../journal/impl/subscriber/CommandPoller.java | 2 +-
.../impl/subscriber/DistributionSubscriber.java | 269 +++++++--------------
...ling.java => PreConditionTimeoutException.java} | 9 +-
.../impl/subscriber/SubscriberConfiguration.java | 1 +
.../{impl => service}/subscriber/BookKeeper.java | 85 ++++---
.../service/subscriber/BookKeeperFactory.java | 90 +++++++
.../subscriber/ContentPackageExtractor.java | 4 +-
.../service/subscriber/ImportedEventFactory.java | 55 +++++
.../{impl => service}/subscriber/LocalStore.java | 4 +-
.../journal/service/subscriber/NoopMetric.java | 99 ++++++++
.../subscriber/PackageHandler.java | 45 +++-
.../subscriber/PackageHandling.java | 2 +-
.../subscriber}/PackageRetries.java | 4 +-
.../subscriber/SubscriberIdle.java | 9 +-
.../subscriber/SubscriberMetrics.java} | 156 ++----------
.../journal/{impl => }/shared/AgentState.java | 2 +-
.../{impl => }/shared/DefaultDistributionLog.java | 2 +-
.../shared/DistributionMetricsService.java | 114 +--------
.../{impl/queue/impl => shared}/EntryUtil.java | 2 +-
.../{impl => }/shared/ExponentialBackOff.java | 2 +-
.../journal/{impl => }/shared/JMXRegistration.java | 2 +-
.../{impl => }/shared/JournalAvailableChecker.java | 4 +-
.../shared/JournalAvailableServiceMarker.java | 2 +-
.../journal/{impl => }/shared/LimitPoller.java | 2 +-
.../journal/{impl => }/shared/PackageBrowser.java | 2 +-
.../{impl => }/shared/PackageViewerPlugin.java | 2 +-
.../queue/impl => shared}/QueueEntryFactory.java | 2 +-
.../shared/SimpleDistributionResponse.java | 2 +-
.../journal/{impl => }/shared/Topics.java | 2 +-
.../impl/precondition/DefaultPreconditionTest.java | 5 +-
.../precondition/PackageStatusWatcherTest.java | 4 +-
.../impl/precondition/StagingPreconditionTest.java | 56 ++---
.../impl/publisher/DiscoveryServiceTest.java | 4 +-
.../impl/publisher/DistPublisherJMXTest.java | 3 +-
.../impl/publisher/DistributionPublisherTest.java | 4 +-
.../publisher/PackageDistributedNotifierTest.java | 2 +-
.../journal/impl/publisher/PackageRepoTest.java | 4 +-
.../journal/impl/queue/QueueItemFactoryTest.java | 2 +-
.../journal/impl/queue/impl/EntryUtilTest.java | 1 +
.../impl/queue/impl/OffsetQueueImplJMXTest.java | 2 +-
.../journal/impl/queue/impl/PubQueueCacheTest.java | 4 +-
.../impl/queue/impl/PubQueueProviderTest.java | 2 +-
.../journal/impl/queue/impl/PubQueueTest.java | 1 +
.../journal/impl/queue/impl/RangePollerTest.java | 2 +-
.../journal/impl/queue/impl/SubQueueTest.java | 72 ------
.../journal/impl/subscriber/AnnouncerTest.java | 2 +
.../journal/impl/subscriber/CommandPollerTest.java | 4 +-
.../journal/impl/subscriber/SubscriberTest.java | 170 ++++---------
.../subscriber/BookKeeperTest.java | 46 +++-
.../subscriber/ContentPackageExtractorTest.java | 3 +-
.../subscriber/LocalStoreTest.java | 3 +-
.../subscriber}/PackageRetriesTest.java | 4 +-
.../subscriber/SubscriberIdleTest.java | 3 +-
.../shared/DistributionMetricsServiceTest.java | 14 +-
.../{impl => }/shared/ExponentialBackoffTest.java | 3 +-
.../shared/JournalAvailableCheckerTest.java | 11 +-
.../journal/{impl => }/shared/LimitPollerTest.java | 4 +-
.../{impl => }/shared/PackageBrowserTest.java | 3 +-
.../{impl => }/shared/PackageViewerPluginTest.java | 5 +-
.../shared/SimpleDistributionResponseTest.java | 3 +-
.../journal/{impl => }/shared/TestMessageInfo.java | 2 +-
76 files changed, 671 insertions(+), 1000 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
index dceed02..5144378 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
@@ -23,7 +23,7 @@ import org.osgi.service.component.annotations.Component;
@Component(immediate = true, service = Precondition.class, property = { "name=default" })
public class DefaultPrecondition implements Precondition {
@Override
- public boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) {
- return true;
+ public Decision canProcess(String subAgentName, long pkgOffset) {
+ return Decision.ACCEPT;
}
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
index 0408e0d..506cf55 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
@@ -29,10 +29,10 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.journal.shared.Topics;
public class PackageStatusWatcher implements Closeable {
private final Closeable poller;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
index 3730475..d934431 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
@@ -33,6 +33,7 @@ public interface Precondition {
* @throws InterruptedException if the thread was interrupted and should shut down
* @return true if the package can be processed; otherwise it returns false.
*/
- boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws InterruptedException, TimeoutException;
+ Decision canProcess(String subAgentName, long pkgOffset);
+ enum Decision { ACCEPT, SKIP, WAIT};
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
index 2272888..ad6be9c 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
@@ -21,12 +21,10 @@ package org.apache.sling.distribution.journal.impl.precondition;
import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
-import java.util.concurrent.TimeoutException;
-
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
@@ -56,8 +54,6 @@ public class StagingPrecondition implements Precondition, Runnable {
private volatile PackageStatusWatcher watcher;
- private volatile boolean running = true;
-
@Activate
public void activate() {
watcher = new PackageStatusWatcher(messagingProvider, topics);
@@ -67,31 +63,15 @@ public class StagingPrecondition implements Precondition, Runnable {
@Deactivate
public synchronized void deactivate() {
IOUtils.closeQuietly(watcher);
- running = false;
}
@Override
- public boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws InterruptedException, TimeoutException {
- if (timeoutSeconds < 1) {
- throw new IllegalArgumentException();
- }
-
- // try to get the status for timeoutSeconds and then throw
- for(int i=0; i < timeoutSeconds * 10; i++) {
- Status status = getStatus(subAgentName, pkgOffset);
-
- if (status != null) {
- return status == Status.IMPORTED;
- } else {
- Thread.sleep(100);
- }
-
- if (!running) {
- throw new InterruptedException("Staging precondition is shutting down");
- }
+ public Decision canProcess(String subAgentName, long pkgOffset) {
+ Status status = getStatus(subAgentName, pkgOffset);
+ if (status == null) {
+ return Decision.WAIT;
}
-
- throw new TimeoutException("Timeout waiting for package offset " + pkgOffset + " on status topic.");
+ return status == Status.IMPORTED ? Decision.ACCEPT : Decision.SKIP;
}
private synchronized Status getStatus(String subAgentName, long pkgOffset) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
index af052a3..c201a42 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
@@ -28,9 +28,9 @@ import java.util.Hashtable;
import javax.annotation.ParametersAreNonnullByDefault;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.commons.io.IOUtils;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index 55787dc..c1c75d0 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -20,12 +20,12 @@ package org.apache.sling.distribution.journal.impl.publisher;
import static java.util.stream.StreamSupport.stream;
-import static org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.timed;
import static java.util.Objects.requireNonNull;
import static org.apache.sling.distribution.DistributionRequestState.ACCEPTED;
import static org.apache.sling.distribution.DistributionRequestType.ADD;
import static org.apache.sling.distribution.DistributionRequestType.DELETE;
import static org.apache.sling.distribution.DistributionRequestType.TEST;
+import static org.apache.sling.distribution.journal.shared.DistributionMetricsService.timed;
import java.util.Arrays;
import java.util.Collections;
@@ -47,11 +47,6 @@ import javax.management.NotCompliantMBeanException;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
-import org.apache.sling.distribution.journal.impl.shared.AgentState;
-import org.apache.sling.distribution.journal.impl.shared.DefaultDistributionLog;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.commons.lang3.StringUtils;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.distribution.DistributionRequest;
@@ -73,9 +68,13 @@ import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.metatype.annotations.Designate;
-
-import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.shared.AgentState;
+import org.apache.sling.distribution.journal.shared.DefaultDistributionLog;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.JMXRegistration;
+import org.apache.sling.distribution.journal.shared.SimpleDistributionResponse;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.JournalAvailable;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
index 591a0cf..e3dace3 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
@@ -29,8 +29,8 @@ import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
import org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.PackageDistributedMessage;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.JsonMessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
index aee0c52..81a4fba 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
@@ -25,8 +25,6 @@ import javax.jcr.Node;
import javax.jcr.Property;
import javax.jcr.nodetype.NodeType;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.jackrabbit.api.ReferenceBinary;
import org.apache.jackrabbit.commons.JcrUtils;
import org.apache.sling.api.resource.LoginException;
@@ -47,6 +45,8 @@ import org.slf4j.LoggerFactory;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.Topics;
import static java.util.Collections.singletonMap;
import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
index aaa7b1c..db27714 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
@@ -26,6 +26,8 @@ import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.shared.EntryUtil;
+import org.apache.sling.distribution.journal.shared.QueueEntryFactory;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.DistributionQueueState;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
index 9bfd4fd..197d0b1 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
@@ -40,6 +40,8 @@ import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.shared.EntryUtil;
+import org.apache.sling.distribution.journal.shared.QueueEntryFactory;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.DistributionQueueItemState;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index 189513c..b794116 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -41,8 +41,6 @@ import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.osgi.service.event.Event;
@@ -53,6 +51,8 @@ import org.slf4j.LoggerFactory;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.JMXRegistration;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessageSender;
@@ -176,7 +176,9 @@ public class PubQueueCache {
sender.send(topic, pkgMsg);
sleep(seedingDelayMs);
} catch (MessagingException e) {
- LOG.warn(e.getMessage(), e);
+ if (!(e.getCause() instanceof InterruptedException)) {
+ LOG.warn(e.getMessage(), e);
+ }
sleep(seedingDelayMs * 10);
}
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
index c5eeb8c..5709493 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
@@ -22,9 +22,9 @@ import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.JournalAvailable;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
index cc9e839..a72a358 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
@@ -30,11 +30,11 @@ import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java
deleted file mode 100644
index 5d34e6d..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.impl.queue.impl;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.Set;
-
-import javax.annotation.CheckForNull;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import javax.annotation.ParametersAreNonnullByDefault;
-
-import org.apache.sling.distribution.queue.spi.DistributionQueue;
-import org.apache.sling.distribution.queue.DistributionQueueEntry;
-import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.apache.sling.distribution.queue.DistributionQueueItemState;
-import org.apache.sling.distribution.queue.DistributionQueueState;
-import org.apache.sling.distribution.queue.DistributionQueueStatus;
-import org.apache.sling.distribution.queue.DistributionQueueType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.sling.distribution.queue.DistributionQueueItemState.QUEUED;
-import static org.apache.sling.distribution.queue.DistributionQueueState.BLOCKED;
-import static org.apache.sling.distribution.queue.DistributionQueueState.IDLE;
-import static org.apache.sling.distribution.queue.DistributionQueueState.RUNNING;
-import static org.apache.sling.distribution.queue.DistributionQueueType.ORDERED;
-
-@ParametersAreNonnullByDefault
-public class SubQueue implements DistributionQueue {
-
- private static final String UNSUPPORTED_CLEAR_OPERATION = "Unsupported clear operation";
-
- @SuppressWarnings("unused")
- private static final Logger LOG = LoggerFactory.getLogger(SubQueue.class);
-
- private final DistributionQueueItem headItem;
-
- private final PackageRetries packageRetries;
-
- private final String queueName;
-
- private final QueueEntryFactory entryFactory;
-
- public SubQueue(String queueName,
- @Nullable
- DistributionQueueItem headItem,
- PackageRetries packageRetries) {
- this.headItem = headItem;
- this.queueName = Objects.requireNonNull(queueName);
- this.packageRetries = Objects.requireNonNull(packageRetries);
- this.entryFactory = new QueueEntryFactory(queueName, this::attempts);
- }
-
- @Nonnull
- @Override
- public String getName() {
- return queueName;
- }
-
- @Override
- public DistributionQueueEntry add(DistributionQueueItem queueItem) {
- throw new UnsupportedOperationException("Unsupported add operation");
- }
-
- @Override
- @CheckForNull
- public DistributionQueueEntry getHead() {
- return entryFactory.create(headItem);
- }
-
- @Nonnull
- @Override
- public Iterable<DistributionQueueEntry> getEntries(int skip, int limit) {
- final List<DistributionQueueEntry> entries;
- if (skip == 0 && (limit == -1 || limit > 0) && headItem != null) {
- entries = Collections.singletonList(entryFactory.create(headItem));
- } else {
- entries = Collections.emptyList();
- }
- return Collections.unmodifiableList(entries);
- }
-
- @Override
- public DistributionQueueEntry getEntry(String entryId) {
- return (entryId.equals(EntryUtil.entryId(headItem)))
- ? entryFactory.create(headItem)
- : null;
- }
-
- @Override
- public DistributionQueueEntry remove(String entryId) {
- throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
- }
-
- @Nonnull
- @Override
- public Iterable<DistributionQueueEntry> remove(Set<String> entryIds) {
- throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
- }
-
- @Nonnull
- @Override
- public Iterable<DistributionQueueEntry> clear(int limit) {
- throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
- }
-
- @Nonnull
- @Override
- public DistributionQueueStatus getStatus() {
- final DistributionQueueState queueState;
- final int itemsCount;
- DistributionQueueEntry headEntry = getHead();
- if (headEntry != null) {
- itemsCount = 1;
- DistributionQueueItemState itemState = headEntry.getStatus().getItemState();
- if (itemState == QUEUED) {
- queueState = RUNNING;
- } else {
- queueState = BLOCKED;
- }
- } else {
- itemsCount = 0;
- queueState = IDLE;
- }
-
- return new DistributionQueueStatus(itemsCount, queueState);
- }
-
- @Override
- @Nonnull
- public DistributionQueueType getType() {
- return ORDERED;
- }
-
- @Override
- public boolean hasCapability(String capability) {
- return false;
- }
-
- private int attempts(DistributionQueueItem queueItem) {
- String entryId = EntryUtil.entryId(queueItem);
- return packageRetries.get(entryId);
- }
-
-}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
index 7ecd03b..beeb62d 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
@@ -32,6 +32,7 @@ import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
+import org.apache.sling.distribution.journal.service.subscriber.BookKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
index f9d047b..3a1df97 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
@@ -27,8 +27,8 @@ import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 74fc5e2..a86cbc5 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -24,21 +24,14 @@ import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toSet;
import static org.apache.sling.distribution.journal.HandlerAdapter.create;
import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.PACKAGE_MSG;
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_OFFSET;
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TIMESTAMP;
import java.io.Closeable;
import java.util.Collections;
-import java.util.Dictionary;
-import java.util.Hashtable;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
@@ -46,37 +39,25 @@ import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.jackrabbit.vault.packaging.Packaging;
-import org.apache.sling.api.resource.LoginException;
-import org.apache.sling.api.resource.PersistenceException;
-import org.apache.sling.api.resource.ResourceResolver;
-import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.commons.metrics.Timer;
import org.apache.sling.commons.osgi.PropertiesUtil;
-import org.apache.sling.distribution.DistributionRequest;
-import org.apache.sling.distribution.DistributionRequestState;
-import org.apache.sling.distribution.DistributionRequestType;
-import org.apache.sling.distribution.DistributionResponse;
import org.apache.sling.distribution.agent.DistributionAgentState;
import org.apache.sling.distribution.agent.spi.DistributionAgent;
-import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.JournalAvailable;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.impl.precondition.Precondition;
-import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
-import org.apache.sling.distribution.journal.impl.queue.impl.SubQueue;
-import org.apache.sling.distribution.journal.impl.shared.AgentState;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import org.apache.sling.distribution.log.spi.DistributionLog;
+import org.apache.sling.distribution.journal.service.subscriber.BookKeeper;
+import org.apache.sling.distribution.journal.service.subscriber.BookKeeperFactory;
+import org.apache.sling.distribution.journal.service.subscriber.PackageHandling;
+import org.apache.sling.distribution.journal.service.subscriber.SubscriberMetrics;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
-import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.settings.SlingSettingsService;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
@@ -84,7 +65,6 @@ import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.event.EventAdmin;
import org.osgi.service.metatype.annotations.Designate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -95,28 +75,18 @@ import com.google.protobuf.GeneratedMessage;
* A Subscriber SCD agent which consumes messages produced by a
* {@code DistributionPublisher} agent.
*/
-@Component(service = {}, immediate = true, property = {
- "announceDelay=10000" }, configurationPid = "org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriberFactory")
+@Component(service = DistributionSubscriber.class, immediate = true,
+ property = { "announceDelay=10000" },
+ configurationPid = "org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriberFactory")
@Designate(ocd = SubscriberConfiguration.class, factory = true)
@ParametersAreNonnullByDefault
-public class DistributionSubscriber implements DistributionAgent {
+public class DistributionSubscriber {
private static final int PRECONDITION_TIMEOUT = 60;
static int RETRY_DELAY = 5000;
static int QUEUE_FETCH_DELAY = 1000;
private static final Logger LOG = LoggerFactory.getLogger(DistributionSubscriber.class);
- private static final Set<DistributionRequestType> SUPPORTED_REQ_TYPES = Collections.emptySet();
-
- @Reference(name = "packageBuilder")
- private DistributionPackageBuilder packageBuilder;
-
- @Reference
- private SlingSettingsService slingSettings;
-
- @Reference
- private ResourceResolverFactory resolverFactory;
-
@Reference
private MessagingProvider messagingProvider;
@@ -124,33 +94,34 @@ public class DistributionSubscriber implements DistributionAgent {
private Topics topics;
@Reference
- private EventAdmin eventAdmin;
-
- @Reference
private JournalAvailable journalAvailable;
@Reference(name = "precondition")
private Precondition precondition;
- @Reference
- private DistributionMetricsService distributionMetricsService;
+ @Reference(name = "packageBuilder")
+ private DistributionPackageBuilder packageBuilder;
@Reference
- private Packaging packaging;
+ private SubscriberMetrics subscriberMetrics;
- SubscriberIdle subscriberIdle;
+ @Reference
+ private SlingSettingsService slingSettings;
+ @Reference
+ BookKeeperFactory bookKeeperFactory;
+
private ServiceRegistration<DistributionAgent> componentReg;
private Closeable packagePoller;
private CommandPoller commandPoller;
- private BookKeeper bookKeeper;
-
+ BookKeeper bookKeeper;
+
// Use a bounded internal buffer to allow reading further packages while working
// on one at a time
- private final BlockingQueue<DistributionQueueItem> queueItemsBuffer = new LinkedBlockingQueue<>(8);
+ private final BlockingQueue<FullMessage<PackageMessage>> queueItemsBuffer = new LinkedBlockingQueue<>(8);
private Set<String> queueNames = Collections.emptySet();
@@ -158,39 +129,30 @@ public class DistributionSubscriber implements DistributionAgent {
private String subAgentName;
- private String pkgType;
-
private volatile boolean running = true;
private volatile Thread queueProcessor;
@Activate
public void activate(SubscriberConfiguration config, BundleContext context, Map<String, Object> properties) {
- String subSlingId = requireNonNull(slingSettings.getSlingId());
subAgentName = requireNonNull(config.name());
requireNonNull(config);
requireNonNull(context);
+
requireNonNull(packageBuilder);
+ requireNonNull(subscriberMetrics);
requireNonNull(slingSettings);
- requireNonNull(resolverFactory);
requireNonNull(messagingProvider);
requireNonNull(topics);
- requireNonNull(eventAdmin);
requireNonNull(precondition);
+ requireNonNull(bookKeeperFactory);
- // Unofficial config (currently just for test)
- Integer idleMillies = (Integer) properties.getOrDefault("idleMillies", SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS);
- subscriberIdle = new SubscriberIdle(context, idleMillies);
-
queueNames = getNotEmpty(config.agentNames());
+ String subSlingId = requireNonNull(slingSettings.getSlingId());
int maxRetries = config.maxRetries();
boolean editable = config.editable();
-
- ContentPackageExtractor extractor = new ContentPackageExtractor(packaging, config.packageHandling());
- PackageHandler packageHandler = new PackageHandler(packageBuilder, extractor);
- bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packageHandler, eventAdmin,
- sender(topics.getStatusTopic()), subAgentName, subSlingId, editable, maxRetries);
-
+ PackageHandling packageHandling = config.packageHandling();
+ bookKeeper = bookKeeperFactory.create(packageBuilder, subAgentName, subSlingId, maxRetries, editable, packageHandling, sender(topics.getStatusTopic()));
long startOffset = bookKeeper.loadOffset() + 1;
String assign = messagingProvider.assignTo(startOffset);
@@ -206,14 +168,11 @@ public class DistributionSubscriber implements DistributionAgent {
announcer = new Announcer(subSlingId, subAgentName, queueNames, sender(topics.getDiscoveryTopic()), bookKeeper,
maxRetries, config.editable(), announceDelay);
- pkgType = requireNonNull(packageBuilder.getType());
boolean errorQueueEnabled = (maxRetries >= 0);
String msg = format(
- "Started Subscriber agent %s at offset %s, subscribed to agent names %s with package builder %s editable %s maxRetries %s errorQueueEnabled %s",
- subAgentName, startOffset, queueNames, pkgType, config.editable(), maxRetries, errorQueueEnabled);
+ "Started Subscriber agent %s at offset %s, subscribed to agent names %s editable %s maxRetries %s errorQueueEnabled %s",
+ subAgentName, startOffset, queueNames, config.editable(), maxRetries, errorQueueEnabled);
LOG.info(msg);
- Dictionary<String, Object> props = createServiceProps(config);
- componentReg = context.registerService(DistributionAgent.class, this, props);
}
private <T extends GeneratedMessage> Consumer<T> sender(String topic) {
@@ -225,24 +184,10 @@ public class DistributionSubscriber implements DistributionAgent {
return asList(agentNames).stream().filter(StringUtils::isNotBlank).collect(toSet());
}
- private Dictionary<String, Object> createServiceProps(SubscriberConfiguration config) {
- Dictionary<String, Object> props = new Hashtable<>();
- props.put("name", config.name());
- props.put("title", config.name());
- props.put("details", config.name());
- props.put("agentNames", config.agentNames());
- props.put("editable", config.editable());
- props.put("maxRetries", config.maxRetries());
- props.put("packageBuilder.target", config.packageBuilder_target());
- props.put("precondition.target", config.precondition_target());
- props.put("webconsole.configurationFactory.nameHint", config.webconsole_configurationFactory_nameHint());
- return props;
- }
-
@Deactivate
public void deactivate() {
componentReg.unregister();
- IOUtils.closeQuietly(subscriberIdle, announcer, bookKeeper,
+ IOUtils.closeQuietly(announcer, bookKeeper,
packagePoller, commandPoller);
running = false;
Thread interrupter = this.queueProcessor;
@@ -250,71 +195,22 @@ public class DistributionSubscriber implements DistributionAgent {
interrupter.interrupt();
}
String msg = String.format(
- "Stopped Subscriber agent %s, subscribed to Publisher agent names %s with package builder %s",
- subAgentName, queueNames, pkgType);
+ "Stopped Subscriber agent %s, subscribed to Publisher agent names %s",
+ subAgentName, queueNames);
LOG.info(msg);
}
-
- @Nonnull
- @Override
- public Iterable<String> getQueueNames() {
- return queueNames;
- }
-
- @Override
- public DistributionQueue getQueue(@Nonnull String queueName) {
- DistributionQueueItem head = queueItemsBuffer.stream()
- .filter(item -> isIn(queueName, item))
- .findFirst()
- .orElse(null);
- return new SubQueue(queueName, head, bookKeeper.getPackageRetries());
- }
-
- private boolean isIn(String queueName, DistributionQueueItem queueItem) {
- PackageMessage packageMsg = queueItem.get(QueueItemFactory.PACKAGE_MSG, PackageMessage.class);
- return queueName.equals(packageMsg.getPubAgentName());
- }
-
- @Nonnull
- @Override
- public DistributionLog getLog() {
- return this::emptyDistributionLog;
- }
-
- private List<String> emptyDistributionLog() {
- return Collections.emptyList();
- }
-
+
@Nonnull
- @Override
public DistributionAgentState getState() {
- return AgentState.getState(this);
- }
-
- @Nonnull
- @Override
- public DistributionResponse execute(ResourceResolver resourceResolver, DistributionRequest request) {
- return executeUnsupported(request);
+ return bookKeeper.getState();
}
-
- @Nonnull
- private DistributionResponse executeUnsupported(DistributionRequest request) {
- String msg = format("Request type %s is not supported by this agent, expected one of %s",
- request.getRequestType(), SUPPORTED_REQ_TYPES);
- LOG.info(msg);
- return new SimpleDistributionResponse(DistributionRequestState.DROPPED, msg);
- }
-
+
private void handlePackageMessage(MessageInfo info, PackageMessage message) {
if (shouldEnqueue(info, message)) {
- DistributionQueueItem queueItem = QueueItemFactory.fromPackage(info, message, true);
+ FullMessage<PackageMessage> queueItem = new FullMessage<PackageMessage>(info, message);
enqueue(queueItem);
} else {
- try {
- bookKeeper.skipPackage(info.getOffset());
- } catch (PersistenceException | LoginException e) {
- LOG.info("Error marking message at offset {} as skipped", info.getOffset(), e);
- }
+ bookKeeper.skipPackage(info.getOffset());
}
}
@@ -323,23 +219,19 @@ public class DistributionSubscriber implements DistributionAgent {
LOG.info("Skipping package for Publisher agent {} at offset {} (not subscribed)", message.getPubAgentName(), info.getOffset());
return false;
}
- if (!pkgType.equals(message.getPkgType())) {
- LOG.warn("Skipping package with type {} at offset {}", message.getPkgType(), info.getOffset());
- return false;
- }
return true;
}
-
+
/**
* We block here if the buffer is full in order to limit the number of binary
* packages fetched in memory. Note that each queued item contains the binary
* package to be imported.
*/
- private void enqueue(DistributionQueueItem queueItem) {
+ private void enqueue(FullMessage<PackageMessage> queueItem) {
try {
while (running) {
if (queueItemsBuffer.offer(queueItem, 1000, TimeUnit.MILLISECONDS)) {
- distributionMetricsService.getItemsBufferSize().increment();
+ subscriberMetrics.getItemsBufferSize().increment();
return;
}
}
@@ -349,7 +241,7 @@ public class DistributionSubscriber implements DistributionAgent {
throw new RuntimeException();
}
}
-
+
private void processQueue() {
LOG.info("Started Queue processor");
while (!Thread.interrupted()) {
@@ -361,37 +253,27 @@ public class DistributionSubscriber implements DistributionAgent {
}
LOG.info("Stopped Queue processor");
}
-
+
private void fetchAndProcessQueueItem() throws InterruptedException {
try {
-
- bookKeeper.sendStoredStatus();
- DistributionQueueItem item = blockingPeekQueueItem();
-
- try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) {
- processQueueItem(item);
- } finally {
- subscriberIdle.idle();
+ FullMessage<PackageMessage> item = blockingPeekQueueItem();
+ try (Timer.Context context = subscriberMetrics.getProcessQueueItemDuration().time()) {
+ bookKeeper.processPackage(item.getInfo(), item.getMessage(), this::shouldRemove);
}
-
- } catch (TimeoutException e) {
- /**
- * Precondition timed out. We only log this on info level as it is no error
- */
+ queueItemsBuffer.remove();
+ } catch (PreConditionTimeoutException e) {
LOG.info(e.getMessage());
- Thread.sleep(RETRY_DELAY);
- } catch (InterruptedException e) {
- throw e;
+ retryDelay();
} catch (Exception e) {
// Catch all to prevent processing from stopping
- LOG.error("Error processing queue item", e);
- Thread.sleep(RETRY_DELAY);
+ LOG.warn("Error processing queue item", e);
+ retryDelay();
}
}
-
- private DistributionQueueItem blockingPeekQueueItem() throws InterruptedException {
+
+ private FullMessage<PackageMessage> blockingPeekQueueItem() throws InterruptedException {
while (true) {
- DistributionQueueItem queueItem = queueItemsBuffer.peek();
+ FullMessage<PackageMessage> queueItem = queueItemsBuffer.peek();
if (queueItem != null) {
return queueItem;
} else {
@@ -400,23 +282,38 @@ public class DistributionSubscriber implements DistributionAgent {
}
}
- private void processQueueItem(DistributionQueueItem queueItem) throws PersistenceException, LoginException, DistributionException, InterruptedException, TimeoutException {
- long offset = queueItem.get(RECORD_OFFSET, Long.class);
- PackageMessage pkgMsg = queueItem.get(PACKAGE_MSG, PackageMessage.class);
- boolean skip = shouldSkip(offset);
- subscriberIdle.busy();
- if (skip) {
- bookKeeper.removePackage(pkgMsg, offset);
- } else {
- long createdTime = queueItem.get(RECORD_TIMESTAMP, Long.class);
- bookKeeper.importPackage(pkgMsg, offset, createdTime);
+ private void retryDelay() {
+ try {
+ Thread.sleep(RETRY_DELAY);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private boolean shouldRemove(long offset, PackageMessage message) {
+ if (commandPoller.isCleared(offset)) {
+ return true;
}
- queueItemsBuffer.remove();
- distributionMetricsService.getItemsBufferSize().decrement();
+ return waitPrecondition(offset);
}
- private boolean shouldSkip(long offset) throws InterruptedException, TimeoutException {
- return commandPoller.isCleared(offset) || !precondition.canProcess(subAgentName, offset, PRECONDITION_TIMEOUT);
+ private boolean waitPrecondition(long offset) {
+ Decision decision = Decision.WAIT;
+ long endTime = System.currentTimeMillis() + PRECONDITION_TIMEOUT * 1000;
+ while (decision == Decision.WAIT && System.currentTimeMillis() < endTime) {
+ decision = precondition.canProcess(subAgentName, offset);
+ if (decision == Decision.WAIT) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ } else {
+ return decision == Decision.SKIP ? true : false;
+ }
+ }
+ throw new PreConditionTimeoutException("Timeout waiting for package offset " + offset + " on status topic.");
}
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandling.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PreConditionTimeoutException.java
similarity index 79%
copy from src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandling.java
copy to src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PreConditionTimeoutException.java
index 55dc55e..684ab9d 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandling.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PreConditionTimeoutException.java
@@ -18,6 +18,11 @@
*/
package org.apache.sling.distribution.journal.impl.subscriber;
-public enum PackageHandling {
- Off, Extract, Install
+public class PreConditionTimeoutException extends RuntimeException {
+ public PreConditionTimeoutException(String msg) {
+ super(msg);
+ }
+
+ private static final long serialVersionUID = 6286011641627241560L;
+
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
index 937c30a..625a62b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
@@ -18,6 +18,7 @@
*/
package org.apache.sling.distribution.journal.impl.subscriber;
+import org.apache.sling.distribution.journal.service.subscriber.PackageHandling;
import org.osgi.service.metatype.annotations.AttributeDefinition;
import org.osgi.service.metatype.annotations.ObjectClassDefinition;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeper.java
similarity index 82%
rename from src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
rename to src/main/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeper.java
index 53a7245..001fafc 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeper.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
import static java.lang.String.format;
import static java.lang.System.currentTimeMillis;
@@ -31,6 +31,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@@ -41,14 +42,13 @@ import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.commons.metrics.Timer;
+import org.apache.sling.distribution.agent.DistributionAgentState;
import org.apache.sling.distribution.common.DistributionException;
-import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
-import org.apache.sling.distribution.journal.impl.queue.impl.PackageRetries;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
+import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.journal.service.subscriber.SubscriberMetrics.GaugeService;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
@@ -85,9 +85,11 @@ public class BookKeeper implements Closeable {
private final Logger log = LoggerFactory.getLogger(this.getClass());
private final ResourceResolverFactory resolverFactory;
- private final DistributionMetricsService distributionMetricsService;
+ private final SubscriberMetrics subscriberMetrics;
private final PackageHandler packageHandler;
+ public final SubscriberIdle subscriberIdle;
private final EventAdmin eventAdmin;
+
private final Consumer<PackageStatusMessage> sender;
private final boolean editable;
private final int maxRetries;
@@ -101,9 +103,10 @@ public class BookKeeper implements Closeable {
private GaugeService<Integer> retriesGauge;
private int skippedCounter = 0;
- public BookKeeper(ResourceResolverFactory resolverFactory,
- DistributionMetricsService distributionMetricsService,
+ BookKeeper(ResourceResolverFactory resolverFactory,
+ SubscriberMetrics subscriberMetrics,
PackageHandler packageHandler,
+ SubscriberIdle subscriberIdle,
EventAdmin eventAdmin,
Consumer<PackageStatusMessage> sender,
String subAgentName,
@@ -111,11 +114,12 @@ public class BookKeeper implements Closeable {
boolean editable,
int maxRetries) {
this.packageHandler = packageHandler;
+ this.subscriberIdle = subscriberIdle;
this.eventAdmin = eventAdmin;
- String nameRetries = DistributionMetricsService.SUB_COMPONENT + ".current_retries;sub_name=" + subAgentName;
- this.retriesGauge = distributionMetricsService.createGauge(nameRetries, "Retries of current package", packageRetries::getSum);
+ String nameRetries = SubscriberMetrics.SUB_COMPONENT + ".current_retries;sub_name=" + subAgentName;
+ this.subscriberMetrics = subscriberMetrics;
+ this.retriesGauge = subscriberMetrics.createGauge(nameRetries, "Retries of current package", packageRetries::getSum);
this.resolverFactory = resolverFactory;
- this.distributionMetricsService = distributionMetricsService;
this.sender = sender;
this.subAgentName = subAgentName;
this.subSlingId = subSlingId;
@@ -128,6 +132,31 @@ public class BookKeeper implements Closeable {
this.processedOffsets = new LocalStore(resolverFactory, "packages", subAgentName);
}
+ public void processPackage(MessageInfo info, PackageMessage pkgMsg, BiPredicate<Long, PackageMessage> shouldRemove) throws Exception {
+ try {
+ sendStoredStatus();
+ long offset = info.getOffset();
+ boolean remove = shouldRemove.test(offset, pkgMsg);
+ subscriberIdle.busy();
+ if (remove) {
+ removePackage(pkgMsg, offset);
+ } else {
+ long createdTime = info.getCreateTime();
+ importPackage(pkgMsg, offset, createdTime);
+ }
+ } finally {
+ subscriberIdle.idle();
+ sendStoredStatus();
+ }
+ }
+
+ public DistributionAgentState getState() {
+ if (subscriberIdle.isIdle()) {
+ return DistributionAgentState.IDLE;
+ }
+ return packageRetries.getSum() > 0 ? DistributionAgentState.BLOCKED : DistributionAgentState.RUNNING;
+ }
+
/**
* We aim at processing the packages exactly once. Processing the packages
* exactly once is possible with the following conditions
@@ -144,11 +173,11 @@ public class BookKeeper implements Closeable {
* failing. For those packages importers, we aim at processing packages at least
* once, thanks to the order in which the content updates are applied.
*/
- public void importPackage(PackageMessage pkgMsg, long offset, long createdTime) throws DistributionException {
+ private void importPackage(PackageMessage pkgMsg, long offset, long createdTime) throws DistributionException {
log.info("Importing distribution package {} of type {} at offset {}",
pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
addPackageMDC(pkgMsg);
- try (Timer.Context context = distributionMetricsService.getImportedPackageDuration().time();
+ try (Timer.Context context = subscriberMetrics.getImportedPackageDuration().time();
ResourceResolver importerResolver = getServiceResolver(SUBSERVICE_IMPORTER)) {
packageHandler.apply(importerResolver, pkgMsg);
if (editable) {
@@ -156,10 +185,10 @@ public class BookKeeper implements Closeable {
}
storeOffset(importerResolver, offset);
importerResolver.commit();
- distributionMetricsService.getImportedPackageSize().update(pkgMsg.getPkgLength());
- distributionMetricsService.getPackageDistributedDuration().update((currentTimeMillis() - createdTime), TimeUnit.MILLISECONDS);
+ subscriberMetrics.getImportedPackageSize().update(pkgMsg.getPkgLength());
+ subscriberMetrics.getPackageDistributedDuration().update((currentTimeMillis() - createdTime), TimeUnit.MILLISECONDS);
packageRetries.clear(pkgMsg.getPubAgentName());
- Event event = DistributionEvent.eventImporterImported(pkgMsg, subAgentName);
+ Event event = ImportedEventFactory.create(pkgMsg, subAgentName);
eventAdmin.postEvent(event);
} catch (LoginException | IOException | RuntimeException e) {
failure(pkgMsg, offset, e);
@@ -195,7 +224,7 @@ public class BookKeeper implements Closeable {
* @throws DistributionException if the package should be retried
*/
private void failure(PackageMessage pkgMsg, long offset, Exception e) throws DistributionException {
- distributionMetricsService.getFailedPackageImports().mark();
+ subscriberMetrics.getFailedPackageImports().mark();
String pubAgentName = pkgMsg.getPubAgentName();
int retries = packageRetries.get(pubAgentName);
@@ -210,10 +239,10 @@ public class BookKeeper implements Closeable {
}
}
- public void removePackage(PackageMessage pkgMsg, long offset) throws LoginException, PersistenceException {
+ private void removePackage(PackageMessage pkgMsg, long offset) throws LoginException, PersistenceException {
log.info("Removing distribution package {} of type {} at offset {}",
pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
- Timer.Context context = distributionMetricsService.getRemovedPackageDuration().time();
+ Timer.Context context = subscriberMetrics.getRemovedPackageDuration().time();
try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
if (editable) {
storeStatus(resolver, new PackageStatus(REMOVED, offset, pkgMsg.getPubAgentName()));
@@ -225,17 +254,19 @@ public class BookKeeper implements Closeable {
context.stop();
}
- public void skipPackage(long offset) throws LoginException, PersistenceException {
+ public void skipPackage(long offset) {
log.info("Skipping package at offset {}", offset);
if (shouldCommitSkipped()) {
try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
storeOffset(resolver, offset);
resolver.commit();
+ } catch (Exception e) {
+ log.warn("");
}
}
}
- public synchronized boolean shouldCommitSkipped() {
+ private synchronized boolean shouldCommitSkipped() {
skippedCounter ++;
if (skippedCounter > COMMIT_AFTER_NUM_SKIPPED) {
skippedCounter = 1;
@@ -249,8 +280,8 @@ public class BookKeeper implements Closeable {
* Send status stored in a previous run if exists
* @throws InterruptedException
*/
- public void sendStoredStatus() throws InterruptedException {
- try (Timer.Context context = distributionMetricsService.getSendStoredStatusDuration().time()) {
+ private void sendStoredStatus() throws InterruptedException {
+ try (Timer.Context context = subscriberMetrics.getSendStoredStatusDuration().time()) {
PackageStatus status = new PackageStatus(statusStore.load());
boolean sent = status.sent;
int retry = 0;
@@ -286,7 +317,7 @@ public class BookKeeper implements Closeable {
log.info("Sent status message {}", pkgStatMsg);
}
- public void markStatusSent() {
+ private void markStatusSent() {
try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
statusStore.store(resolver, "sent", true);
resolver.commit();
@@ -303,10 +334,6 @@ public class BookKeeper implements Closeable {
return packageRetries.get(pubAgentName);
}
- public PackageRetries getPackageRetries() {
- return packageRetries;
- }
-
@Override
public void close() throws IOException {
IOUtils.closeQuietly(retriesGauge);
@@ -315,7 +342,7 @@ public class BookKeeper implements Closeable {
private void removeFailedPackage(PackageMessage pkgMsg, long offset) throws DistributionException {
log.info("Removing failed distribution package {} of type {} at offset {}",
pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
- Timer.Context context = distributionMetricsService.getRemovedFailedPackageDuration().time();
+ Timer.Context context = subscriberMetrics.getRemovedFailedPackageDuration().time();
try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
storeStatus(resolver, new PackageStatus(REMOVED_FAILED, offset, pkgMsg.getPubAgentName()));
storeOffset(resolver, offset);
diff --git a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeperFactory.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeperFactory.java
new file mode 100644
index 0000000..b0cdbcb
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeperFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.service.subscriber;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+import java.util.function.Consumer;
+
+import org.apache.jackrabbit.vault.packaging.Packaging;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.event.EventAdmin;
+
+@Component(service = BookKeeperFactory.class, immediate = true)
+public class BookKeeperFactory {
+
+
+
+ @Reference
+ private ResourceResolverFactory resolverFactory;
+
+ @Reference
+ private EventAdmin eventAdmin;
+
+ @Reference
+ private SubscriberMetrics subscriberMetrics;
+
+ @Reference
+ private Packaging packaging;
+
+ private BundleContext context;
+
+ private Integer idleMillies;
+
+ public BookKeeperFactory() {
+ }
+
+ public BookKeeperFactory(ResourceResolverFactory resolverFactory,
+ EventAdmin eventAdmin, SubscriberMetrics subscriberMetrics, Packaging packaging) {
+ this.resolverFactory = resolverFactory;
+ this.eventAdmin = eventAdmin;
+ this.subscriberMetrics = subscriberMetrics;
+ this.packaging = packaging;
+ }
+
+ public void activate(BundleContext context, Map<String, Object> properties) {
+ this.context = context;
+ this.idleMillies = (Integer) properties.getOrDefault("idleMillies", SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS);
+ requireNonNull(resolverFactory);
+ requireNonNull(eventAdmin);
+ requireNonNull(subscriberMetrics);
+ }
+
+ public BookKeeper create(
+ DistributionPackageBuilder packageBuilder,
+ String subAgentName,
+ String subSlingId,
+ int maxRetries,
+ boolean editable,
+ PackageHandling packageHandling,
+ Consumer<PackageStatusMessage> sender
+ ) {
+ ContentPackageExtractor extractor = new ContentPackageExtractor(packaging, packageHandling);
+ PackageHandler packageHandler = new PackageHandler(packageBuilder, extractor);
+ SubscriberIdle subscriberIdle = new SubscriberIdle(context, this.idleMillies);
+ return new BookKeeper(resolverFactory, subscriberMetrics, packageHandler, subscriberIdle, eventAdmin,
+ sender, subAgentName, subSlingId, editable, maxRetries);
+ }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractor.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractor.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractor.java
rename to src/main/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractor.java
index 998f929..c626550 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractor.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractor.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
import static java.util.Objects.requireNonNull;
@@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory;
* Each distribution package is inspected for possible content packages in /etc/packages.
* Such content packages are installed via the Packaging service.
*/
-public class ContentPackageExtractor {
+class ContentPackageExtractor {
private static final String PACKAGE_BASE_PATH = "/etc/packages/";
private final Logger log = LoggerFactory.getLogger(this.getClass());
diff --git a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/ImportedEventFactory.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/ImportedEventFactory.java
new file mode 100644
index 0000000..37a4734
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/ImportedEventFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.service.subscriber;
+
+import static org.apache.sling.distribution.event.DistributionEventProperties.DISTRIBUTION_COMPONENT_KIND;
+import static org.apache.sling.distribution.event.DistributionEventProperties.DISTRIBUTION_COMPONENT_NAME;
+import static org.apache.sling.distribution.event.DistributionEventProperties.DISTRIBUTION_PATHS;
+import static org.apache.sling.distribution.event.DistributionEventProperties.DISTRIBUTION_TYPE;
+import static org.apache.sling.distribution.event.DistributionEventTopics.IMPORTER_PACKAGE_IMPORTED;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.sling.distribution.journal.messages.Messages;
+import org.osgi.service.event.Event;
+
+@ParametersAreNonnullByDefault
+class ImportedEventFactory {
+
+ public static final String PACKAGE_ID = "distribution.package.id";
+ private static final String KIND_IMPORTER = "importer";
+
+ private ImportedEventFactory() {
+ }
+
+ public static Event create(Messages.PackageMessage pkgMsg, String agentName) {
+ String[] pathsList = pkgMsg.getPathsList().toArray(new String[0]);
+ Map<String, Object> props = new HashMap<>();
+ props.put(DISTRIBUTION_COMPONENT_KIND, KIND_IMPORTER);
+ props.put(DISTRIBUTION_COMPONENT_NAME, agentName);
+ props.put(DISTRIBUTION_TYPE, pkgMsg.getReqType().name());
+ props.put(DISTRIBUTION_PATHS, pathsList);
+ props.put(PACKAGE_ID, pkgMsg.getPkgId());
+ return new Event(IMPORTER_PACKAGE_IMPORTED, props);
+ }
+
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/LocalStore.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java
rename to src/main/java/org/apache/sling/distribution/journal/service/subscriber/LocalStore.java
index c619d79..0018641 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/LocalStore.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.ModifiableValueMap;
@@ -44,7 +44,7 @@ import static java.util.Objects.requireNonNull;
import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
@ParametersAreNonnullByDefault
-public class LocalStore {
+class LocalStore {
private static final String ROOT_PATH = "/var/sling/distribution/journal/stores";
diff --git a/src/main/java/org/apache/sling/distribution/journal/service/subscriber/NoopMetric.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/NoopMetric.java
new file mode 100644
index 0000000..375a792
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/NoopMetric.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.service.subscriber;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.sling.commons.metrics.Counter;
+import org.apache.sling.commons.metrics.Histogram;
+import org.apache.sling.commons.metrics.Meter;
+import org.apache.sling.commons.metrics.Timer;
+
+enum NoopMetric implements Counter, Histogram, Timer, Meter{
+ INSTANCE;
+
+ @Override
+ public long getCount() {
+ return 0;
+ }
+
+ @Override
+ public void increment() {
+
+ }
+
+ @Override
+ public void decrement() {
+
+ }
+
+ @Override
+ public void increment(long n) {
+
+ }
+
+ @Override
+ public void decrement(long n) {
+
+ }
+
+ @Override
+ public void mark() {
+
+ }
+
+ @Override
+ public void mark(long n) {
+
+ }
+
+ @Override
+ public void update(long duration, TimeUnit unit) {
+
+ }
+
+ @Override
+ public Context time() {
+ return NoopContext.INSTANCE;
+ }
+
+ @Override
+ public void update(long value) {
+
+ }
+
+ @Override
+ public <AdapterType> AdapterType adaptTo(Class<AdapterType> type) {
+ return null;
+ }
+
+ private enum NoopContext implements Context {
+ INSTANCE;
+
+ @Override
+ public long stop() {
+ return 0;
+ }
+
+ @Override
+ public void close() {
+
+ }
+ }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandler.java
similarity index 64%
rename from src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
rename to src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandler.java
index a7148c4..ef832f3 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandler.java
@@ -16,24 +16,32 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
import static java.lang.String.format;
+import java.io.ByteArrayInputStream;
import java.io.InputStream;
+import java.util.Objects;
+
+import javax.annotation.Nonnull;
+import javax.jcr.Binary;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.ValueFactory;
import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.commons.jackrabbit.SimpleReferenceBinary;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.distribution.common.DistributionException;
-import org.apache.sling.distribution.journal.impl.shared.PackageBrowser;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class PackageHandler {
+class PackageHandler {
private static final Logger LOG = LoggerFactory.getLogger(PackageHandler.class);
private DistributionPackageBuilder packageBuilder;
@@ -66,13 +74,38 @@ public class PackageHandler {
LOG.info("Importing paths {}",pkgMsg.getPathsList());
InputStream pkgStream = null;
try {
- pkgStream = PackageBrowser.pkgStream(resolver, pkgMsg);
- packageBuilder.installPackage(resolver, pkgStream);
- extractor.handle(resolver, pkgMsg.getPathsList());
+ pkgStream = pkgStream(resolver, pkgMsg);
+ if (canHandlePackage(pkgMsg)) {
+ packageBuilder.installPackage(resolver, pkgStream);
+ extractor.handle(resolver, pkgMsg.getPathsList());
+ }
} finally {
IOUtils.closeQuietly(pkgStream);
}
+ }
+ private boolean canHandlePackage(PackageMessage pkgMsg) {
+ return Objects.equals(packageBuilder.getType(), pkgMsg.getPkgType());
+ }
+
+ @Nonnull
+ public static InputStream pkgStream(ResourceResolver resolver, PackageMessage pkgMsg) throws DistributionException {
+ if (pkgMsg.hasPkgBinary()) {
+ return new ByteArrayInputStream(pkgMsg.getPkgBinary().toByteArray());
+ } else {
+ String pkgBinRef = pkgMsg.getPkgBinaryRef();
+ try {
+ Session session = resolver.adaptTo(Session.class);
+ if (session == null) {
+ throw new DistributionException("Unable to get Oak session");
+ }
+ ValueFactory factory = session.getValueFactory();
+ Binary binary = factory.createValue(new SimpleReferenceBinary(pkgBinRef)).getBinary();
+ return binary.getStream();
+ } catch (RepositoryException e) {
+ throw new DistributionException(e.getMessage(), e);
+ }
+ }
}
private void installDeletePackage(ResourceResolver resolver, PackageMessage pkgMsg)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandling.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandling.java
similarity index 92%
rename from src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandling.java
rename to src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandling.java
index 55dc55e..a1130fc 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandling.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageHandling.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
public enum PackageHandling {
Off, Extract, Install
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetries.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetries.java
similarity index 94%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetries.java
rename to src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetries.java
index 23f9463..1466d85 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetries.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetries.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.service.subscriber;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -27,7 +27,7 @@ import javax.annotation.ParametersAreNonnullByDefault;
* Holds package retries by agent name
*/
@ParametersAreNonnullByDefault
-public class PackageRetries {
+class PackageRetries {
// (pubAgentName x retries)
private final Map<String, Integer> pubAgentNameToRetries = new ConcurrentHashMap<>();
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdle.java
similarity index 93%
rename from src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
rename to src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdle.java
index a05b80c..e72378d 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdle.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
import java.io.Closeable;
import java.util.Hashtable;
@@ -42,6 +42,7 @@ public class SubscriberIdle implements SystemReadyCheck, Closeable {
private final int idleMillis;
private final AtomicBoolean isReady = new AtomicBoolean();
+ private final AtomicBoolean idle = new AtomicBoolean();
private final ScheduledExecutorService executor;
private ScheduledFuture<?> schedule;
@@ -69,17 +70,23 @@ public class SubscriberIdle implements SystemReadyCheck, Closeable {
* Called when processing of a message starts
*/
public synchronized void busy() {
+ idle.set(false);
cancelSchedule();
}
public boolean isReady() {
return isReady.get();
}
+
+ public boolean isIdle() {
+ return idle.get();
+ }
/**
* Called when processing of a message has finished
*/
public synchronized void idle() {
+ idle.set(true);
if (!isReady.get()) {
cancelSchedule();
schedule = executor.schedule(this::ready, idleMillis, TimeUnit.MILLISECONDS);
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberMetrics.java
similarity index 61%
copy from src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
copy to src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberMetrics.java
index f18c04c..f5bcae0 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberMetrics.java
@@ -16,14 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.service.subscriber;
import static java.lang.String.format;
import java.io.Closeable;
import java.util.Dictionary;
import java.util.Hashtable;
-import java.util.concurrent.Callable;
import java.util.function.Supplier;
import org.apache.sling.commons.metrics.Counter;
@@ -41,13 +40,11 @@ import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@Component(service = DistributionMetricsService.class)
-public class DistributionMetricsService {
+@Component(service = SubscriberMetrics.class)
+public class SubscriberMetrics {
public static final String BASE_COMPONENT = "distribution.journal";
- public static final String PUB_COMPONENT = BASE_COMPONENT + ".publisher";
-
public static final String SUB_COMPONENT = BASE_COMPONENT + ".subscriber";
private final Logger log = LoggerFactory.getLogger(this.getClass());
@@ -55,18 +52,8 @@ public class DistributionMetricsService {
@Reference
private MetricsService metricsService;
- private Counter cleanupPackageRemovedCount;
-
- private Timer cleanupPackageDuration;
-
private Histogram importedPackageSize;
- private Histogram exportedPackageSize;
-
- private Meter acceptedRequests;
-
- private Meter droppedRequests;
-
private Counter itemsBufferSize;
private Timer removedPackageDuration;
@@ -83,26 +70,23 @@ public class DistributionMetricsService {
private Timer packageDistributedDuration;
- private Timer buildPackageDuration;
-
- private Timer enqueuePackageDuration;
-
- private Counter queueCacheFetchCount;
-
private BundleContext context;
+
+ public SubscriberMetrics() {
+ importedPackageSize = NoopMetric.INSTANCE;
+ itemsBufferSize = NoopMetric.INSTANCE;
+ importedPackageDuration = NoopMetric.INSTANCE;
+ removedPackageDuration = NoopMetric.INSTANCE;
+ removedFailedPackageDuration = NoopMetric.INSTANCE;
+ failedPackageImports = NoopMetric.INSTANCE;
+ sendStoredStatusDuration = NoopMetric.INSTANCE;
+ processQueueItemDuration = NoopMetric.INSTANCE;
+ packageDistributedDuration = NoopMetric.INSTANCE;
+ }
@Activate
public void activate(BundleContext context) {
this.context = context;
- cleanupPackageRemovedCount = getCounter(getMetricName(PUB_COMPONENT, "cleanup_package_removed_count"));
- cleanupPackageDuration = getTimer(getMetricName(PUB_COMPONENT, "cleanup_package_duration"));
- exportedPackageSize = getHistogram(getMetricName(PUB_COMPONENT, "exported_package_size"));
- acceptedRequests = getMeter(getMetricName(PUB_COMPONENT, "accepted_requests"));
- droppedRequests = getMeter(getMetricName(PUB_COMPONENT, "dropped_requests"));
- buildPackageDuration = getTimer(getMetricName(PUB_COMPONENT, "build_package_duration"));
- enqueuePackageDuration = getTimer(getMetricName(PUB_COMPONENT, "enqueue_package_duration"));
- queueCacheFetchCount = getCounter(getMetricName(PUB_COMPONENT, "queue_cache_fetch_count"));
-
importedPackageSize = getHistogram(getMetricName(SUB_COMPONENT, "imported_package_size"));
itemsBufferSize = getCounter(getMetricName(SUB_COMPONENT, "items_buffer_size"));
importedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, "imported_package_duration"));
@@ -115,56 +99,6 @@ public class DistributionMetricsService {
}
/**
- * Runs provided code updating provided metric
- * with its execution time.
- * The method guarantees that the metric is updated
- * even if the code throws an exception
- * @param metric metric to update
- * @param code code to clock
- * @throws Exception actually it doesn't
- */
- public static void timed(Timer metric, Runnable code) throws Exception {
- try (Timer.Context ignored = metric.time()) {
- code.run();
- }
- }
-
- /**
- * Runs provided code updating provided metric
- * with its execution time.
- * The method guarantees that the metric is updated
- * even if the code throws an exception
- * @param metric metric to update
- * @param code code to clock
- * @return a value returned but <code>code.call()</code> invocation
- * @throws Exception if underlying code throws
- */
- public static <T> T timed(Timer metric, Callable<T> code) throws Exception {
- try (Timer.Context ignored = metric.time()) {
- return code.call();
- }
- }
-
- /**
- * Counter of package removed during the Package Cleanup Task.
- * The count is the sum of all packages removed since the service started.
- *
- * @return a Sling Metrics timer
- */
- public Counter getCleanupPackageRemovedCount() {
- return cleanupPackageRemovedCount;
- }
-
- /**
- * Timer of the Package Cleanup Task execution duration.
- *
- * @return a Sling Metrics timer
- */
- public Timer getCleanupPackageDuration() {
- return cleanupPackageDuration;
- }
-
- /**
* Histogram of the imported content package size in Byte.
*
* @return a Sling Metrics histogram
@@ -174,33 +108,6 @@ public class DistributionMetricsService {
}
/**
- * Histogram of the exported content package size in Bytes.
- *
- * @return a Sling Metrics histogram
- */
- public Histogram getExportedPackageSize() {
- return exportedPackageSize;
- }
-
- /**
- * Meter of requests returning an {@code DistributionRequestState.ACCEPTED} state.
- *
- * @return a Sling Metrics meter
- */
- public Meter getAcceptedRequests() {
- return acceptedRequests;
- }
-
- /**
- * Meter of requests returning an {@code DistributionRequestState.DROPPED} state.
- *
- * @return a Sling Metrics meter
- */
- public Meter getDroppedRequests() {
- return droppedRequests;
- }
-
- /**
* Counter of the package buffer size on the subscriber.
*
* @return a Sling Metrics counter
@@ -273,33 +180,6 @@ public class DistributionMetricsService {
return packageDistributedDuration;
}
- /**
- * Timer capturing the duration in ms of building a content package
- *
- * @return a Sling Metric timer
- */
- public Timer getBuildPackageDuration() {
- return buildPackageDuration;
- }
-
- /**
- * Timer capturing the duration in ms of adding a package to the queue
- *
- * @return a Sling Metric timer
- */
- public Timer getEnqueuePackageDuration() {
- return enqueuePackageDuration;
- }
-
- /**
- * Counter of fetch operations to feed the queue cache.
- *
- * @return a Sling Metric counter
- */
- public Counter getQueueCacheFetchCount() {
- return queueCacheFetchCount;
- }
-
public <T> GaugeService<T> createGauge(String name, String description, Supplier<T> supplier) {
return new GaugeService<>(name, description, supplier);
}
@@ -327,7 +207,7 @@ public class DistributionMetricsService {
public class GaugeService<T> implements Gauge<T>, Closeable {
@SuppressWarnings("rawtypes")
- private final ServiceRegistration<Gauge> reg;
+ private ServiceRegistration<Gauge> reg;
private final Supplier<T> supplier;
private GaugeService(String name, String description, Supplier<T> supplier) {
@@ -336,7 +216,9 @@ public class DistributionMetricsService {
props.put(Constants.SERVICE_DESCRIPTION, description);
props.put(Constants.SERVICE_VENDOR, "The Apache Software Foundation");
props.put(Gauge.NAME, name);
- reg = context.registerService(Gauge.class, this, props);
+ if (context != null) {
+ reg = context.registerService(Gauge.class, this, props);
+ }
}
@Override
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/AgentState.java b/src/main/java/org/apache/sling/distribution/journal/shared/AgentState.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/AgentState.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/AgentState.java
index 858e8dc..51d5466 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/AgentState.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/AgentState.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DefaultDistributionLog.java b/src/main/java/org/apache/sling/distribution/journal/shared/DefaultDistributionLog.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/DefaultDistributionLog.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/DefaultDistributionLog.java
index bb49514..80c3747 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DefaultDistributionLog.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/DefaultDistributionLog.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java b/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
similarity index 68%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
index f18c04c..4dd1167 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import static java.lang.String.format;
@@ -48,8 +48,6 @@ public class DistributionMetricsService {
public static final String PUB_COMPONENT = BASE_COMPONENT + ".publisher";
- public static final String SUB_COMPONENT = BASE_COMPONENT + ".subscriber";
-
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Reference
@@ -59,30 +57,12 @@ public class DistributionMetricsService {
private Timer cleanupPackageDuration;
- private Histogram importedPackageSize;
-
private Histogram exportedPackageSize;
private Meter acceptedRequests;
private Meter droppedRequests;
- private Counter itemsBufferSize;
-
- private Timer removedPackageDuration;
-
- private Timer removedFailedPackageDuration;
-
- private Timer importedPackageDuration;
-
- private Meter failedPackageImports;
-
- private Timer sendStoredStatusDuration;
-
- private Timer processQueueItemDuration;
-
- private Timer packageDistributedDuration;
-
private Timer buildPackageDuration;
private Timer enqueuePackageDuration;
@@ -102,16 +82,6 @@ public class DistributionMetricsService {
buildPackageDuration = getTimer(getMetricName(PUB_COMPONENT, "build_package_duration"));
enqueuePackageDuration = getTimer(getMetricName(PUB_COMPONENT, "enqueue_package_duration"));
queueCacheFetchCount = getCounter(getMetricName(PUB_COMPONENT, "queue_cache_fetch_count"));
-
- importedPackageSize = getHistogram(getMetricName(SUB_COMPONENT, "imported_package_size"));
- itemsBufferSize = getCounter(getMetricName(SUB_COMPONENT, "items_buffer_size"));
- importedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, "imported_package_duration"));
- removedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, "removed_package_duration"));
- removedFailedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, "removed_failed_package_duration"));
- failedPackageImports = getMeter(getMetricName(SUB_COMPONENT, "failed_package_imports"));
- sendStoredStatusDuration = getTimer(getMetricName(SUB_COMPONENT, "send_stored_status_duration"));
- processQueueItemDuration = getTimer(getMetricName(SUB_COMPONENT, "process_queue_item_duration"));
- packageDistributedDuration = getTimer(getMetricName(SUB_COMPONENT, "request_distributed_duration"));
}
/**
@@ -165,15 +135,6 @@ public class DistributionMetricsService {
}
/**
- * Histogram of the imported content package size in Byte.
- *
- * @return a Sling Metrics histogram
- */
- public Histogram getImportedPackageSize() {
- return importedPackageSize;
- }
-
- /**
* Histogram of the exported content package size in Bytes.
*
* @return a Sling Metrics histogram
@@ -201,79 +162,6 @@ public class DistributionMetricsService {
}
/**
- * Counter of the package buffer size on the subscriber.
- *
- * @return a Sling Metrics counter
- */
- public Counter getItemsBufferSize() {
- return itemsBufferSize;
- }
-
- /**
- * Timer capturing the duration in ms of successful packages import operations.
- *
- * @return a Sling Metrics timer
- */
- public Timer getImportedPackageDuration() {
- return importedPackageDuration;
- }
-
- /**
- * Timer capturing the duration in ms of packages successfully removed from an editable subscriber.
- *
- * @return a Sling Metrics timer
- */
- public Timer getRemovedPackageDuration() {
- return removedPackageDuration;
- }
-
- /**
- * Timer capturing the duration in ms of packages successfully removed automatically from a subscriber supporting error queue.
- *
- * @return a Sling Metrics timer
- */
- public Timer getRemovedFailedPackageDuration() {
- return removedFailedPackageDuration;
- }
-
- /**
- * Meter of failures to import packages.
- *
- * @return a Sling Metrics meter
- */
- public Meter getFailedPackageImports() {
- return failedPackageImports;
- }
-
- /**
- * Timer capturing the duration in ms of sending a stored package status.
- *
- * @return a Sling Metric timer
- */
- public Timer getSendStoredStatusDuration() {
- return sendStoredStatusDuration;
- }
-
- /**
- * Timer capturing the duration in ms of processing a queue item.
- *
- * @return a Sling Metric timer
- */
- public Timer getProcessQueueItemDuration() {
- return processQueueItemDuration;
- }
-
- /**
- * Timer capturing the duration in ms of distributing a distribution package.
- * The timer starts when the package is enqueued and stops when the package is successfully imported.
- *
- * @return a Sling Metric timer
- */
- public Timer getPackageDistributedDuration() {
- return packageDistributedDuration;
- }
-
- /**
* Timer capturing the duration in ms of building a content package
*
* @return a Sling Metric timer
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java b/src/main/java/org/apache/sling/distribution/journal/shared/EntryUtil.java
similarity index 96%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/EntryUtil.java
index 529fc52..ed2d461 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/EntryUtil.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.shared;
import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
import org.apache.sling.distribution.queue.DistributionQueueItem;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java b/src/main/java/org/apache/sling/distribution/journal/shared/ExponentialBackOff.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/ExponentialBackOff.java
index 28f5aa8..48727a7 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/ExponentialBackOff.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JMXRegistration.java b/src/main/java/org/apache/sling/distribution/journal/shared/JMXRegistration.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/JMXRegistration.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/JMXRegistration.java
index e4f36c9..a1e3924 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JMXRegistration.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/JMXRegistration.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import static java.lang.management.ManagementFactory.getPlatformMBeanServer;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java b/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableChecker.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableChecker.java
index 3582b36..6cb5170 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableChecker.java
@@ -16,14 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import static java.util.Objects.requireNonNull;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.ExceptionEventSender;
import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService.GaugeService;
import org.osgi.framework.BundleContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableServiceMarker.java b/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableServiceMarker.java
similarity index 96%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableServiceMarker.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableServiceMarker.java
index 287f3fa..9478372 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableServiceMarker.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableServiceMarker.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import java.util.Objects;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/LimitPoller.java b/src/main/java/org/apache/sling/distribution/journal/shared/LimitPoller.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/LimitPoller.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/LimitPoller.java
index 2bb30b5..18cf976 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/LimitPoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/LimitPoller.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import static org.apache.sling.distribution.journal.HandlerAdapter.create;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java b/src/main/java/org/apache/sling/distribution/journal/shared/PackageBrowser.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/PackageBrowser.java
index b9d1d2d..72eb177 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowser.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/PackageBrowser.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import static java.util.Collections.singletonMap;
import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java b/src/main/java/org/apache/sling/distribution/journal/shared/PackageViewerPlugin.java
similarity index 99%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/PackageViewerPlugin.java
index 30f8836..8ac9ba5 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPlugin.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/PackageViewerPlugin.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import java.io.IOException;
import java.io.PrintWriter;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java b/src/main/java/org/apache/sling/distribution/journal/shared/QueueEntryFactory.java
similarity index 97%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/QueueEntryFactory.java
index 47a9f25..58a0717 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/QueueEntryFactory.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.shared;
import static org.apache.sling.distribution.queue.DistributionQueueItemState.ERROR;
import static org.apache.sling.distribution.queue.DistributionQueueItemState.QUEUED;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponse.java b/src/main/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponse.java
similarity index 96%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponse.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponse.java
index e49700e..eb0510c 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponse.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponse.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/Topics.java b/src/main/java/org/apache/sling/distribution/journal/shared/Topics.java
similarity index 98%
rename from src/main/java/org/apache/sling/distribution/journal/impl/shared/Topics.java
rename to src/main/java/org/apache/sling/distribution/journal/shared/Topics.java
index 1f4d44a..c9ce5e2 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/Topics.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/Topics.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java
index f279265..7808b12 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java
@@ -22,12 +22,13 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
import org.apache.sling.distribution.journal.impl.precondition.DefaultPrecondition;
+import org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
import org.junit.Test;
public class DefaultPreconditionTest {
@Test
public void testAlwaysTrue() {
- boolean canProcess = new DefaultPrecondition().canProcess("any", 100, 10);
- assertThat(canProcess, equalTo(true));
+ Decision decision = new DefaultPrecondition().canProcess("any", 100);
+ assertThat(decision, equalTo(Decision.ACCEPT));
}
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
index 571d98a..1484a9e 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
@@ -19,10 +19,10 @@
package org.apache.sling.distribution.journal.impl.precondition;
import org.apache.sling.distribution.journal.impl.precondition.PackageStatusWatcher;
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.MessagingProvider;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
index eab0479..a5e969b 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
@@ -19,26 +19,21 @@
package org.apache.sling.distribution.journal.impl.precondition;
import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.Closeable;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.impl.precondition.StagingPrecondition;
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.junit.Before;
@@ -89,47 +84,28 @@ public class StagingPreconditionTest {
statusHandler = statusCaptor.getValue().getHandler();
}
- @Test(expected = IllegalArgumentException.class)
- public void testIllegalTimeout() throws InterruptedException, TimeoutException {
- precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT, -1);
- }
-
- @Test(expected = TimeoutException.class)
+ @Test
public void testNotYetProcessed() throws InterruptedException, TimeoutException {
simulateMessage(OTHER_AGENT, 1002, PackageStatusMessage.Status.IMPORTED);
- boolean res = precondition.canProcess(OTHER_AGENT, OFFSET_NOT_PRESENT, 1);
- assertThat(res, equalTo(true));
+ Decision res = precondition.canProcess(OTHER_AGENT, OFFSET_NOT_PRESENT);
+ assertThat(res, equalTo(Decision.WAIT));
+
+ Decision res2 = precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT);
+ assertThat(res2, equalTo(Decision.WAIT));
- // We got no package for this agent. So this should time out
- precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT, 1);
}
@Test
- public void testDeactivateDuringCanProcess() {
- AtomicReference<Throwable> exHolder = new AtomicReference<>();
- Thread th = new Thread(() -> {
- try {
- precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT, 2);
- } catch (Throwable t) {
- exHolder.set(t);
- }
- });
- th.start();
- precondition.deactivate();
- Throwable ex = Awaitility.await().until(() -> exHolder.get(), notNullValue());
- assertThat(ex, instanceOf(InterruptedException.class));
- }
-
- @Test(expected = TimeoutException.class)
public void testCleanup() throws InterruptedException, TimeoutException {
simulateMessage(GP_SUB1_AGENT_NAME, 1002, PackageStatusMessage.Status.IMPORTED);
- assertTrue(precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1));
+ Decision res = precondition.canProcess(GP_SUB1_AGENT_NAME, 1002);
+ assertThat(res, equalTo(Decision.ACCEPT));
// Cleanup
precondition.run();
- // Should time out because after cleanup message is not present anymore
- precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1);
+ Decision res2 = precondition.canProcess(GP_SUB1_AGENT_NAME, 1002);
+ assertThat(res2, equalTo(Decision.WAIT));
}
@Test
@@ -138,9 +114,9 @@ public class StagingPreconditionTest {
simulateMessage(GP_SUB1_AGENT_NAME, 1001, PackageStatusMessage.Status.REMOVED);
simulateMessage(GP_SUB1_AGENT_NAME, 1002, PackageStatusMessage.Status.IMPORTED);
- assertFalse(precondition.canProcess(GP_SUB1_AGENT_NAME, 1000, 1));
- assertFalse(precondition.canProcess(GP_SUB1_AGENT_NAME, 1001, 1));
- assertTrue(precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1));
+ assertThat(precondition.canProcess(GP_SUB1_AGENT_NAME, 1000), equalTo(Decision.SKIP));
+ assertThat(precondition.canProcess(GP_SUB1_AGENT_NAME, 1001), equalTo(Decision.SKIP));
+ assertThat(precondition.canProcess(GP_SUB1_AGENT_NAME, 1002), equalTo(Decision.ACCEPT));
}
private void simulateMessage(String subAgentName, long pkgOffset, PackageStatusMessage.Status status) {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
index 5e7b76c..2b82486 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
@@ -30,8 +30,6 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.UUID;
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -40,6 +38,8 @@ import org.mockito.Mockito;
import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.MessageInfo;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java
index 2a41a2f..f52026c 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMXTest.java
@@ -41,6 +41,7 @@ import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
+import org.apache.sling.distribution.journal.shared.JMXRegistration;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.DistributionQueueItemState;
@@ -53,8 +54,6 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
-import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
-
@SuppressWarnings("rawtypes")
public class DistPublisherJMXTest {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index 7ab2bdf..434c561 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -38,9 +38,9 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import com.google.common.collect.ImmutableMap;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
index acc91b0..ab5d17e 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
@@ -21,9 +21,9 @@ package org.apache.sling.distribution.journal.impl.publisher;
import java.util.Collections;
import java.util.HashSet;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
import org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.queue.DistributionQueueItem;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
index a8fb467..9b41463 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepoTest.java
@@ -30,8 +30,6 @@ import java.util.List;
import java.util.UUID;
import java.util.function.Function;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
@@ -57,6 +55,8 @@ import org.mockito.Spy;
import org.osgi.framework.BundleContext;
import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java
index 6aeb1ba..e3b5582 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactoryTest.java
@@ -31,13 +31,13 @@ import static org.hamcrest.Matchers.hasItemInArray;
import static org.junit.Assert.assertThat;
import org.apache.sling.distribution.DistributionRequestType;
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.hamcrest.Matcher;
import org.junit.Test;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
import org.apache.sling.distribution.journal.MessageInfo;
public class QueueItemFactoryTest {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtilTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtilTest.java
index 11a52f9..5aededc 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtilTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtilTest.java
@@ -20,6 +20,7 @@ package org.apache.sling.distribution.journal.impl.queue.impl;
import java.util.HashMap;
+import org.apache.sling.distribution.journal.shared.EntryUtil;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.junit.Test;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplJMXTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplJMXTest.java
index 18cc957..e7f9639 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplJMXTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/OffsetQueueImplJMXTest.java
@@ -34,7 +34,7 @@ import javax.management.ObjectName;
import javax.management.ReflectionException;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
+import org.apache.sling.distribution.journal.shared.JMXRegistration;
import org.junit.Test;
public class OffsetQueueImplJMXTest {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
index 1113319..5fca8a3 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
@@ -57,10 +57,10 @@ import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
index 61c1389..166ad1a 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
@@ -38,7 +38,6 @@ import javax.management.ObjectInstance;
import javax.management.ObjectName;
import javax.management.ReflectionException;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.MessageSender;
import com.google.protobuf.GeneratedMessage;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
@@ -58,6 +57,7 @@ import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.MessageInfo;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java
index 20dccea..211b9fc 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java
@@ -37,6 +37,7 @@ import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.shared.EntryUtil;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.DistributionQueueType;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
index 778bbaa..3cef383 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
@@ -28,7 +28,6 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.List;
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -41,6 +40,7 @@ import org.mockito.MockitoAnnotations;
import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueueTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueueTest.java
deleted file mode 100644
index d2784e9..0000000
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueueTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.impl.queue.impl;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
-import com.google.common.collect.Lists;
-
-public class SubQueueTest {
-
- @Test
- public void testGetName() throws Exception {
- String queueName = "someQueue";
- SubQueue queue = new SubQueue(queueName, null, new PackageRetries());
- Assert.assertEquals(queueName, queue.getName());
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testAdd() throws Exception {
- SubQueue queue = new SubQueue("someQueue", null, new PackageRetries());
- queue.add(buildQueueItem("package-1"));
- }
-
- @Test
- public void testGetHead() throws Exception {
- SubQueue emptyQueue = new SubQueue("emptyQueue", null, new PackageRetries());
- Assert.assertNull(emptyQueue.getHead());
- SubQueue oneQueue = new SubQueue("oneQueue", buildQueueItem("1"), new PackageRetries());
- Assert.assertNotNull(oneQueue.getHead());
- }
-
- @Test
- public void testGetItems() throws Exception {
- SubQueue oneQueue = new SubQueue("oneQueue", null, new PackageRetries());
- Assert.assertNotNull(oneQueue.getEntries(0, 10));
- SubQueue tenQueue = new SubQueue("tenQueue", buildQueueItem("1"), new PackageRetries());
- Assert.assertEquals(1, Lists.newArrayList(tenQueue.getEntries(0, 10)).size());
- Assert.assertEquals(1, Lists.newArrayList(tenQueue.getEntries(0, -1)).size());
- Assert.assertEquals(0, Lists.newArrayList(tenQueue.getEntries(1, 10)).size());
- }
-
- private DistributionQueueItem buildQueueItem(String packageId) {
- Map<String, Object> properties = new HashMap<>();
- properties.put(QueueItemFactory.RECORD_TOPIC, "topic");
- properties.put(QueueItemFactory.RECORD_OFFSET, 0);
- properties.put(QueueItemFactory.RECORD_PARTITION, 0);
- properties.put(QueueItemFactory.RECORD_TIMESTAMP, System.currentTimeMillis());
- return new DistributionQueueItem(packageId, properties);
- }
-}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
index 05f8a88..e669a82 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/AnnouncerTest.java
@@ -27,7 +27,9 @@ import static org.mockito.Mockito.when;
import java.util.Collections;
import java.util.function.Consumer;
+import org.apache.sling.distribution.journal.impl.subscriber.Announcer;
import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.service.subscriber.BookKeeper;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
index d6c774e..0de3d2e 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
@@ -33,10 +33,10 @@ import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages.ClearCommand;
import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.junit.Before;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index 1d47520..4d0710c 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -21,12 +21,9 @@ package org.apache.sling.distribution.journal.impl.subscriber;
import static org.apache.sling.distribution.agent.DistributionAgentState.IDLE;
import static org.apache.sling.distribution.agent.DistributionAgentState.RUNNING;
import static org.awaitility.Awaitility.await;
-import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.eq;
@@ -40,42 +37,39 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
-import java.util.Dictionary;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.sling.distribution.journal.impl.precondition.Precondition;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.impl.shared.Topics;
-import org.apache.sling.distribution.journal.MessageSender;
+import org.apache.jackrabbit.vault.packaging.Packaging;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.api.resource.ResourceUtil;
-import org.apache.sling.commons.metrics.Counter;
-import org.apache.sling.commons.metrics.Histogram;
-import org.apache.sling.commons.metrics.Meter;
-import org.apache.sling.commons.metrics.Timer;
-import org.apache.sling.distribution.DistributionRequest;
-import org.apache.sling.distribution.DistributionRequestState;
-import org.apache.sling.distribution.DistributionRequestType;
-import org.apache.sling.distribution.DistributionResponse;
-import org.apache.sling.distribution.SimpleDistributionRequest;
import org.apache.sling.distribution.agent.DistributionAgentState;
import org.apache.sling.distribution.agent.spi.DistributionAgent;
import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.MessageHandler;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessageSender;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.precondition.Precondition;
+import org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
+import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.service.subscriber.BookKeeperFactory;
+import org.apache.sling.distribution.journal.service.subscriber.SubscriberMetrics;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.apache.sling.distribution.packaging.DistributionPackageInfo;
-import org.apache.sling.distribution.queue.DistributionQueueEntry;
-import org.apache.sling.distribution.queue.DistributionQueueItemState;
-import org.apache.sling.distribution.queue.DistributionQueueState;
-import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.settings.SlingSettingsService;
import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
import org.awaitility.Awaitility;
@@ -83,34 +77,26 @@ import org.awaitility.Duration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.event.EventAdmin;
import org.osgi.util.converter.Converters;
-import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
-import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
-import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
-
-import org.apache.sling.distribution.journal.HandlerAdapter;
-import org.apache.sling.distribution.journal.MessageHandler;
-import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
@SuppressWarnings("unchecked")
+@RunWith(MockitoJUnitRunner.class)
public class SubscriberTest {
private static final String SUB1_SLING_ID = "sub1sling";
@@ -119,7 +105,7 @@ public class SubscriberTest {
private static final String PUB1_SLING_ID = "pub1sling";
private static final String PUB1_AGENT_NAME = "pub1agent";
- private static final PackageMessage BASIC_ADD_PACKAGE = PackageMessage.newBuilder()
+ public static final PackageMessage BASIC_ADD_PACKAGE = PackageMessage.newBuilder()
.setPkgId("myid")
.setPubSlingId(PUB1_SLING_ID)
.setPubAgentName(PUB1_AGENT_NAME)
@@ -138,6 +124,8 @@ public class SubscriberTest {
.addAllPaths(Arrays.asList("/test"))
.build();
+ @Mock
+ Packaging packaging;
@Mock
private BundleContext context;
@@ -172,8 +160,10 @@ public class SubscriberTest {
@Mock
private MessageSender<PackageStatusMessage> statusSender;
- @Mock
- private DistributionMetricsService distributionMetricsService;
+ @Spy
+ private SubscriberMetrics subscriberMetrics = new SubscriberMetrics();
+
+ BookKeeperFactory bookKeeperFactory;
@InjectMocks
DistributionSubscriber subscriber;
@@ -198,12 +188,14 @@ public class SubscriberTest {
Awaitility.setDefaultPollDelay(Duration.ZERO);
Awaitility.setDefaultPollInterval(Duration.ONE_HUNDRED_MILLISECONDS);
- MockitoAnnotations.initMocks(this);
+
+ bookKeeperFactory = new BookKeeperFactory(resolverFactory, eventAdmin, subscriberMetrics, packaging);
+ Map<String, Object> props = new HashMap<String, Object>();
+ bookKeeperFactory.activate(context, props);
+ subscriber.bookKeeperFactory = bookKeeperFactory;
when(packageBuilder.getType()).thenReturn("journal");
when(slingSettings.getSlingId()).thenReturn(SUB1_SLING_ID);
- mockMetrics();
-
when(clientProvider.<PackageStatusMessage>createSender()).thenReturn(statusSender, (MessageSender) discoverySender);
when(clientProvider.createPoller(
Mockito.anyString(),
@@ -211,8 +203,7 @@ public class SubscriberTest {
Mockito.anyString(),
packageCaptor.capture()))
.thenReturn(poller);
- when(context.registerService(Mockito.any(Class.class), (DistributionAgent) eq(subscriber), Mockito.any(Dictionary.class))).thenReturn(reg);
-
+
// you should call initSubscriber in each test method
}
@@ -227,10 +218,6 @@ public class SubscriberTest {
assumeNoPrecondition();
initSubscriber();
- assertThat(subscriber.getQueueNames(), contains(PUB1_AGENT_NAME));
- assertThat(subscriber.getQueue(PUB1_AGENT_NAME).getStatus().getState(), equalTo(DistributionQueueState.IDLE));
- assertThat(subscriber.getState(), equalTo(DistributionAgentState.IDLE));
-
MessageInfo info = new TestMessageInfo("", 1, 0, 0);
PackageMessage message = BASIC_ADD_PACKAGE;
@@ -239,20 +226,13 @@ public class SubscriberTest {
when(packageBuilder.installPackage(Mockito.any(ResourceResolver.class),
Mockito.any(ByteArrayInputStream.class))
).thenAnswer(new WaitFor(sem));
- packageHandler.handle(info, message);
-
+ packageHandler.handle(info, message);
waitSubscriber(RUNNING);
- DistributionQueue queue = subscriber.getQueue(PUB1_AGENT_NAME);
- DistributionQueueEntry item = queue.getHead();
- assertThat(item.getStatus().getItemState(), equalTo(DistributionQueueItemState.QUEUED));
sem.release();
waitSubscriber(IDLE);
verify(statusSender, times(0)).send(eq(topics.getStatusTopic()),
anyObject());
- List<String> log = subscriber.getLog().getLines();
- // We do not use the DistributionLog anymore
- assertThat(log.size(), equalTo(0));
}
@Test
@@ -268,25 +248,10 @@ public class SubscriberTest {
PackageMessage message = BASIC_DEL_PACKAGE;
packageHandler.handle(info, message);
- waitSubscriber(RUNNING);
- waitSubscriber(IDLE);
- try (ResourceResolver resolver = resolverFactory.getServiceResourceResolver(null)) {
- assertThat(resolver.getResource("/test"), nullValue());
- }
+ await().until(() -> getResource("/test"), nullValue());
}
@Test
- public void testExecuteNotSupported() throws DistributionException {
- assumeNoPrecondition();
- initSubscriber();
-
- DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, "test");
- DistributionResponse response = subscriber.execute(resourceResolver, request);
- assertThat(response.getState(), equalTo(DistributionRequestState.DROPPED));
- }
-
-
- @Test
public void testSendFailedStatus() throws DistributionException {
assumeNoPrecondition();
initSubscriber(ImmutableMap.of("maxRetries", "1"));
@@ -324,37 +289,17 @@ public class SubscriberTest {
initSubscriber();
MessageInfo info = new TestMessageInfo("", 1, 11, 0);
PackageMessage message = BASIC_ADD_PACKAGE;
+ when(precondition.canProcess(eq(SUB1_AGENT_NAME), eq(11))).thenReturn(Decision.SKIP);
packageHandler.handle(info, message);
- waitSubscriber(RUNNING);
- when(precondition.canProcess(eq(SUB1_AGENT_NAME), eq(11), anyInt())).thenReturn(false);
-
try {
waitSubscriber(IDLE);
fail("Cannot be IDLE without a validation status");
} catch (Throwable t) {
}
-
- when(precondition.canProcess(eq(SUB1_AGENT_NAME), eq(11), anyInt())).thenReturn(true);
- waitSubscriber(IDLE);
-
}
- @Test
- public void testReadyWhenWatingForPrecondition() {
- Semaphore sem = new Semaphore(0);
- assumeWaitingForPrecondition(sem);
- initSubscriber();
- MessageInfo info = new TestMessageInfo("", 1, 0, 0);
- PackageMessage message = BASIC_ADD_PACKAGE;
-
- packageHandler.handle(info, message);
- waitSubscriber(RUNNING);
- await("Should report ready").until(subscriber.subscriberIdle::isReady);
- sem.release();
- }
-
private void initSubscriber() {
initSubscriber(Collections.emptyMap());
}
@@ -376,51 +321,20 @@ public class SubscriberTest {
await().until(subscriber::getState, equalTo(expectedState));
}
- private void mockMetrics() {
- Histogram histogram = Mockito.mock(Histogram.class);
- Counter counter = Mockito.mock(Counter.class);
- Meter meter = Mockito.mock(Meter.class);
- Timer timer = Mockito.mock(Timer.class);
- Timer.Context timerContext = Mockito.mock(Timer.Context.class);
- when(timer.time())
- .thenReturn(timerContext);
- when(distributionMetricsService.getImportedPackageSize())
- .thenReturn(histogram);
- when(distributionMetricsService.getItemsBufferSize())
- .thenReturn(counter);
- when(distributionMetricsService.getFailedPackageImports())
- .thenReturn(meter);
- when(distributionMetricsService.getRemovedFailedPackageDuration())
- .thenReturn(timer);
- when(distributionMetricsService.getRemovedPackageDuration())
- .thenReturn(timer);
- when(distributionMetricsService.getImportedPackageDuration())
- .thenReturn(timer);
- when(distributionMetricsService.getSendStoredStatusDuration())
- .thenReturn(timer);
- when(distributionMetricsService.getProcessQueueItemDuration())
- .thenReturn(timer);
- when(distributionMetricsService.getPackageDistributedDuration())
- .thenReturn(timer);
- }
-
private void assumeNoPrecondition() {
try {
- when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong(), anyInt())).thenReturn(true);
+ when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong())).thenReturn(Decision.ACCEPT);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
- private void assumeWaitingForPrecondition(Semaphore sem) {
- try {
- when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong(), anyInt()))
- .thenAnswer(invocation -> sem.tryAcquire(10000, TimeUnit.SECONDS));
- } catch (Exception e) {
- throw new RuntimeException(e);
+ private Resource getResource(String path) throws LoginException {
+ try (ResourceResolver resolver = resolverFactory.getServiceResourceResolver(null)) {
+ return resolver.getResource(path);
}
}
-
+
private final class WaitFor implements Answer<DistributionPackageInfo> {
private final Semaphore sem;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeperTest.java
similarity index 61%
rename from src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
rename to src/test/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeperTest.java
index fd4c761..2ae2803 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/BookKeeperTest.java
@@ -16,24 +16,32 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
+import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.ResourceResolverFactory;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.impl.subscriber.SubscriberTest;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
+import org.osgi.framework.BundleContext;
import org.osgi.service.event.EventAdmin;
@RunWith(MockitoJUnitRunner.class)
@@ -43,8 +51,7 @@ public class BookKeeperTest {
private ResourceResolverFactory resolverFactory = new MockResourceResolverFactory();
- @Mock
- private DistributionMetricsService distributionMetricsService;
+ private SubscriberMetrics subscriberMetrics = new SubscriberMetrics();
@Mock
private PackageHandler packageHandler;
@@ -54,13 +61,18 @@ public class BookKeeperTest {
@Mock
private Consumer<PackageStatusMessage> sender;
+
+ @Mock
+ BundleContext context;
private BookKeeper bookKeeper;
@Before
public void before() {
- bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packageHandler, eventAdmin, sender,
+ SubscriberIdle subscriberIdle = new SubscriberIdle(context, 300);
+ bookKeeper = new BookKeeper(resolverFactory, subscriberMetrics, packageHandler, subscriberIdle, eventAdmin, sender,
"subAgentName", "subSlingId", true, 10);
+ sem = new Semaphore(0);
}
@Test
@@ -78,5 +90,29 @@ public class BookKeeperTest {
assertThat(bookKeeper.loadOffset(), equalTo(20l));
}
}
+
+ @Test
+ public void testReadyWhenWatingForPrecondition() {
+
+ MessageInfo info = new TestMessageInfo("", 1, 0, 0);
+ PackageMessage message = SubscriberTest.BASIC_ADD_PACKAGE;
+ Executors.newSingleThreadExecutor().execute(() -> {
+ try {
+ bookKeeper.processPackage(info, message, this::waitForSemaphore);
+ } catch (Exception e) {
+ }
+ });
+ await("Should report ready").until(bookKeeper.subscriberIdle::isReady);
+ sem.release();
+ }
+
+ public boolean waitForSemaphore(long offset, PackageMessage msg) {
+ try {
+ return sem.tryAcquire(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ return false;
+ }
+ }
+ private Semaphore sem;
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractorTest.java b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractorTest.java
similarity index 97%
rename from src/test/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractorTest.java
rename to src/test/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractorTest.java
index 0868cc8..096b632 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/ContentPackageExtractorTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/ContentPackageExtractorTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
import static java.util.Collections.singletonList;
import static org.mockito.Mockito.verify;
@@ -37,6 +37,7 @@ import org.apache.jackrabbit.vault.packaging.Packaging;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.distribution.journal.service.subscriber.ContentPackageExtractor;
import org.apache.sling.testing.mock.osgi.MockOsgi;
import org.apache.sling.testing.mock.sling.MockSling;
import org.apache.sling.testing.mock.sling.ResourceResolverType;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/LocalStoreTest.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java
rename to src/test/java/org/apache/sling/distribution/journal/service/subscriber/LocalStoreTest.java
index 11b346a..1a03199 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/LocalStoreTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
import java.util.HashMap;
import java.util.Map;
@@ -29,6 +29,7 @@ import static org.junit.Assert.assertEquals;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.distribution.journal.service.subscriber.LocalStore;
import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
import org.junit.Test;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetriesTest.java b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetriesTest.java
similarity index 90%
rename from src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetriesTest.java
rename to src/test/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetriesTest.java
index 6c33412..4f2f0a4 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetriesTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/PackageRetriesTest.java
@@ -16,12 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.service.subscriber;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+import org.apache.sling.distribution.journal.service.subscriber.PackageRetries;
+
public class PackageRetriesTest {
@Test
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdleTest.java
similarity index 94%
rename from src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
rename to src/test/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdleTest.java
index ef5f0d7..6c40034 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/service/subscriber/SubscriberIdleTest.java
@@ -16,12 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.subscriber;
+package org.apache.sling.distribution.journal.service.subscriber;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
import org.apache.felix.systemready.CheckStatus.State;
+import org.apache.sling.distribution.journal.service.subscriber.SubscriberIdle;
import org.junit.Test;
import org.mockito.Mockito;
import org.osgi.framework.BundleContext;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsServiceTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
similarity index 85%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsServiceTest.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
index 1049ef5..50d5507 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsServiceTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertNotNull;
@@ -32,7 +32,8 @@ import org.apache.sling.commons.metrics.Meter;
import org.apache.sling.commons.metrics.MetricsService;
import org.apache.sling.commons.metrics.Timer;
import org.apache.sling.commons.metrics.Timer.Context;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService.GaugeService;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -94,16 +95,7 @@ public class DistributionMetricsServiceTest {
assertNotNull(metrics.getDroppedRequests());
assertNotNull(metrics.getEnqueuePackageDuration());
assertNotNull(metrics.getExportedPackageSize());
- assertNotNull(metrics.getFailedPackageImports());
- assertNotNull(metrics.getImportedPackageDuration());
- assertNotNull(metrics.getImportedPackageSize());
- assertNotNull(metrics.getItemsBufferSize());
- assertNotNull(metrics.getPackageDistributedDuration());
- assertNotNull(metrics.getProcessQueueItemDuration());
assertNotNull(metrics.getQueueCacheFetchCount());
- assertNotNull(metrics.getRemovedFailedPackageDuration());
- assertNotNull(metrics.getRemovedPackageDuration());
- assertNotNull(metrics.getSendStoredStatusDuration());
}
@Test
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/ExponentialBackoffTest.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/ExponentialBackoffTest.java
index bd44057..8fe7ad9 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/ExponentialBackoffTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import static java.time.Duration.of;
import static java.time.temporal.ChronoUnit.MILLIS;
@@ -27,6 +27,7 @@ import java.time.temporal.ChronoUnit;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.sling.distribution.journal.shared.ExponentialBackOff;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.runners.MockitoJUnitRunner;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/JournalAvailableCheckerTest.java
similarity index 90%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/JournalAvailableCheckerTest.java
index ce7f279..0214b83 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/JournalAvailableCheckerTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
@@ -35,9 +35,12 @@ import org.apache.sling.distribution.journal.ExceptionEventSender;
import org.apache.sling.distribution.journal.JournalAvailable;
import org.apache.sling.distribution.journal.MessagingException;
import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
-import org.apache.sling.distribution.journal.impl.shared.JournalAvailableChecker.JournalCheckerConfiguration;
-import org.apache.sling.distribution.journal.impl.shared.Topics.TopicsConfiguration;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.JournalAvailableChecker;
+import org.apache.sling.distribution.journal.shared.Topics;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService.GaugeService;
+import org.apache.sling.distribution.journal.shared.JournalAvailableChecker.JournalCheckerConfiguration;
+import org.apache.sling.distribution.journal.shared.Topics.TopicsConfiguration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/LimitPollerTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/LimitPollerTest.java
similarity index 97%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/LimitPollerTest.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/LimitPollerTest.java
index fed9fef..7fd0207 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/LimitPollerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/LimitPollerTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.samePropertyValuesAs;
@@ -29,7 +29,6 @@ import java.io.IOException;
import java.time.Duration;
import java.util.List;
-import org.apache.sling.distribution.journal.impl.shared.LimitPoller;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -42,6 +41,7 @@ import org.mockito.MockitoAnnotations;
import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.shared.LimitPoller;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/PackageBrowserTest.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/PackageBrowserTest.java
index 14dcc19..ede1b69 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageBrowserTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/PackageBrowserTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
@@ -36,6 +36,7 @@ import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.shared.PackageBrowser;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/PackageViewerPluginTest.java
similarity index 95%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/PackageViewerPluginTest.java
index aa2947a..f483c86 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageViewerPluginTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/PackageViewerPluginTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import static java.util.Collections.emptyList;
import static org.hamcrest.Matchers.containsString;
@@ -43,6 +43,9 @@ import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.shared.PackageBrowser;
+import org.apache.sling.distribution.journal.shared.PackageViewerPlugin;
+import org.apache.sling.distribution.journal.shared.Topics;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponseTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponseTest.java
similarity index 93%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponseTest.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponseTest.java
index 5d31bf6..7202b6b 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/SimpleDistributionResponseTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/SimpleDistributionResponseTest.java
@@ -16,13 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.sling.distribution.DistributionRequestState;
+import org.apache.sling.distribution.journal.shared.SimpleDistributionResponse;
import org.junit.Test;
public class SimpleDistributionResponseTest {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/TestMessageInfo.java b/src/test/java/org/apache/sling/distribution/journal/shared/TestMessageInfo.java
similarity index 96%
rename from src/test/java/org/apache/sling/distribution/journal/impl/shared/TestMessageInfo.java
rename to src/test/java/org/apache/sling/distribution/journal/shared/TestMessageInfo.java
index a2105e3..9145dfe 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/TestMessageInfo.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/TestMessageInfo.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.shared;
+package org.apache.sling.distribution.journal.shared;
import org.apache.sling.distribution.journal.MessageInfo;