You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2021/07/12 18:33:03 UTC
[sling-org-apache-sling-distribution-journal] branch master
updated: SLING-10564 - Log package offset in the Distribution Publisher
(#79)
This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new d801dbf SLING-10564 - Log package offset in the Distribution Publisher (#79)
d801dbf is described below
commit d801dbf749b12c07e8734449293d6a115e7f39cc
Author: Timothee Maret <tm...@apache.org>
AuthorDate: Mon Jul 12 20:32:52 2021 +0200
SLING-10564 - Log package offset in the Distribution Publisher (#79)
* Handle queued notifications via a callback
* Extend queue notifier to return the offset
---
.../impl/publisher/DistributionPublisher.java | 35 +++++-----
.../impl/publisher/PackageQueuedNotifier.java | 79 +++++++++++++---------
.../journal/queue/PubQueueProvider.java | 4 ++
.../distribution/journal/queue/QueuedCallback.java | 30 ++++++++
.../journal/queue/impl/PubQueueCache.java | 20 ++----
.../journal/queue/impl/PubQueueProviderImpl.java | 15 +++-
.../impl/publisher/DistributionPublisherTest.java | 3 +-
.../impl/publisher/PackageQueuedNotifierTest.java | 39 ++++++++---
.../journal/queue/impl/PubQueueCacheTest.java | 6 +-
9 files changed, 150 insertions(+), 81 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index 56a8817..e4adb8a 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -29,12 +29,12 @@ import static org.apache.sling.distribution.journal.shared.DistributionMetricsSe
import java.io.Closeable;
import java.util.Collections;
import java.util.Dictionary;
-import java.util.HashMap;
+import java.util.EnumMap;
import java.util.Hashtable;
-import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
+import java.util.function.ToLongFunction;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
@@ -91,7 +91,7 @@ public class DistributionPublisher implements DistributionAgent {
public static final String FACTORY_PID = "org.apache.sling.distribution.journal.impl.publisher.DistributionPublisherFactory";
- private final Map<DistributionRequestType, Consumer<PackageMessage>> REQ_TYPES = new HashMap<>();
+ private final EnumMap<DistributionRequestType, ToLongFunction<PackageMessage>> reqTypes = new EnumMap<>(DistributionRequestType.class);
private final DefaultDistributionLog log;
@@ -102,9 +102,6 @@ public class DistributionPublisher implements DistributionAgent {
private DistributionPackageBuilder packageBuilder;
@Reference
- private PackageQueuedNotifier queuedNotifier;
-
- @Reference
private DiscoveryService discoveryService;
@Reference
@@ -146,9 +143,9 @@ public class DistributionPublisher implements DistributionAgent {
public DistributionPublisher() {
log = new DefaultDistributionLog(pubAgentName, this.getClass(), DefaultDistributionLog.LogLevel.INFO);
- REQ_TYPES.put(ADD, this::sendAndWait);
- REQ_TYPES.put(DELETE, this::sendAndWait);
- REQ_TYPES.put(TEST, this::send);
+ reqTypes.put(ADD, this::sendAndWait);
+ reqTypes.put(DELETE, this::sendAndWait);
+ reqTypes.put(TEST, this::send);
}
@Activate
@@ -253,7 +250,7 @@ public class DistributionPublisher implements DistributionAgent {
public DistributionResponse execute(ResourceResolver resourceResolver,
DistributionRequest request)
throws DistributionException {
- Consumer<PackageMessage> handler = REQ_TYPES.get(request.getRequestType());
+ ToLongFunction<PackageMessage> handler = reqTypes.get(request.getRequestType());
if (handler != null) {
return execute(resourceResolver, request, handler);
} else {
@@ -263,7 +260,7 @@ public class DistributionPublisher implements DistributionAgent {
private DistributionResponse execute(ResourceResolver resourceResolver,
DistributionRequest request,
- Consumer<PackageMessage> sender)
+ ToLongFunction<PackageMessage> sender)
throws DistributionException {
final PackageMessage pkg;
try {
@@ -275,10 +272,10 @@ public class DistributionPublisher implements DistributionAgent {
}
try {
- timed(distributionMetricsService.getEnqueuePackageDuration(), () -> sender.accept(pkg));
+ long offset = timed(distributionMetricsService.getEnqueuePackageDuration(), () -> sender.applyAsLong(pkg));
distributionMetricsService.getExportedPackageSize().update(pkg.getPkgLength());
distributionMetricsService.getAcceptedRequests().mark();
- String msg = String.format("Request accepted with distribution package %s", pkg);
+ String msg = String.format("Request accepted with distribution package %s at offset=%s", pkg, offset);
log.info(msg);
return new SimpleDistributionResponse(ACCEPTED, msg);
} catch (Throwable e) {
@@ -293,17 +290,19 @@ public class DistributionPublisher implements DistributionAgent {
}
}
- private void send(PackageMessage pkg) {
+ private long send(PackageMessage pkg) {
sender.accept(pkg);
+ return -1;
}
- private void sendAndWait(PackageMessage pkg) {
+ private long sendAndWait(PackageMessage pkg) {
+ PackageQueuedNotifier queuedNotifier = pubQueueProvider.getQueuedNotifier();
try {
- CompletableFuture<Void> received = queuedNotifier.registerWait(pkg.getPkgId());
+ CompletableFuture<Long> received = queuedNotifier.registerWait(pkg.getPkgId());
Event createdEvent = DistributionEvent.eventPackageCreated(pkg, pubAgentName);
eventAdmin.postEvent(createdEvent);
sender.accept(pkg);
- received.get(queuedTimeout, TimeUnit.MILLISECONDS);
+ return received.get(queuedTimeout, TimeUnit.MILLISECONDS);
} catch (Exception e) {
queuedNotifier.unRegisterWait(pkg.getPkgId());
throw new RuntimeException(e);
@@ -313,7 +312,7 @@ public class DistributionPublisher implements DistributionAgent {
@Nonnull
private DistributionResponse executeUnsupported(DistributionRequest request) {
String msg = String.format("Request requestType=%s not supported by this agent, expected one of %s",
- request.getRequestType(), REQ_TYPES.keySet());
+ request.getRequestType(), reqTypes.keySet());
log.info(msg);
return new SimpleDistributionResponse(DistributionRequestState.DROPPED, msg);
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifier.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifier.java
index b51b252..c6747b5 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifier.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifier.java
@@ -18,63 +18,53 @@
*/
package org.apache.sling.distribution.journal.impl.publisher;
+import java.io.Closeable;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
-import org.apache.sling.distribution.event.DistributionEventTopics;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.queue.QueuedCallback;
import org.osgi.service.event.Event;
-import org.osgi.service.event.EventConstants;
-import org.osgi.service.event.EventHandler;
+import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@Component(
- service = {PackageQueuedNotifier.class, EventHandler.class},
- property = EventConstants.EVENT_TOPIC + "=" + DistributionEventTopics.AGENT_PACKAGE_QUEUED
-)
-public class PackageQueuedNotifier implements EventHandler {
+import static java.util.Objects.requireNonNull;
+
+public class PackageQueuedNotifier implements QueuedCallback, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(PackageQueuedNotifier.class);
+ private final EventAdmin eventAdmin;
+
/**
* (packageId x Future)
*/
- private final Map<String, CompletableFuture<Void>> receiveCallbacks;
+ private final Map<String, CompletableFuture<Long>> receiveCallbacks;
- public PackageQueuedNotifier() {
+ public PackageQueuedNotifier(EventAdmin eventAdmin) {
this.receiveCallbacks = new ConcurrentHashMap<>();
+ this.eventAdmin = requireNonNull(eventAdmin);
}
- @Deactivate
- public void deactivate() {
- receiveCallbacks.forEach((packageId, callback) -> {
- LOG.debug("Cancel wait condition for distribution package with pkgId={}", packageId);
- callback.cancel(true);
- });
- LOG.info("Package queue notifier service stopped");
- }
-
- @Override
- public void handleEvent(Event event) {
- String packageId = (String) event.getProperty(DistributionEvent.PACKAGE_ID);
- LOG.debug("Handling event for pkgId={}", packageId);
- CompletableFuture<Void> callback = null;
- if (packageId != null) {
- callback = receiveCallbacks.remove(packageId);
+ private void notifyWait(String pkgId, long offset) {
+ CompletableFuture<Long> callback = null;
+ if (pkgId != null) {
+ callback = receiveCallbacks.remove(pkgId);
}
if (callback != null) {
- callback.complete(null);
+ callback.complete(offset);
}
}
- public CompletableFuture<Void> registerWait(String packageId) {
+ public CompletableFuture<Long> registerWait(String packageId) {
LOG.debug("Registering wait condition for pkgId={}", packageId);
- CompletableFuture<Void> packageReceived = new CompletableFuture<>();
+ CompletableFuture<Long> packageReceived = new CompletableFuture<>();
receiveCallbacks.put(packageId, packageReceived);
return packageReceived;
}
@@ -83,4 +73,31 @@ public class PackageQueuedNotifier implements EventHandler {
LOG.debug("Un-registering wait condition for pkgId={}", packageId);
receiveCallbacks.remove(packageId);
}
+
+ @Override
+ public void queued(List<FullMessage<PackageMessage>> fullMessages) {
+ fullMessages.forEach(this::queued);
+ }
+
+ private void queued(FullMessage<PackageMessage> fullMessage) {
+ long offset = fullMessage.getInfo().getOffset();
+ PackageMessage message = fullMessage.getMessage();
+ LOG.debug("Queued package {} at offset={}", message, offset);
+ sendQueuedEvent(message);
+ notifyWait(message.getPkgId(), offset);
+ }
+
+ private void sendQueuedEvent(PackageMessage message) {
+ Event queuedEvent = DistributionEvent.eventPackageQueued(message, message.getPubAgentName());
+ eventAdmin.postEvent(queuedEvent);
+ }
+
+ @Override
+ public void close() {
+ receiveCallbacks.forEach((packageId, callback) -> {
+ LOG.debug("Cancel wait condition for distribution package with pkgId={}", packageId);
+ callback.cancel(true);
+ });
+ LOG.info("Package queue notifier closed");
+ }
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/queue/PubQueueProvider.java b/src/main/java/org/apache/sling/distribution/journal/queue/PubQueueProvider.java
index 6038afb..cdac7de 100644
--- a/src/main/java/org/apache/sling/distribution/journal/queue/PubQueueProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/PubQueueProvider.java
@@ -26,6 +26,7 @@ import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.impl.publisher.PackageQueuedNotifier;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
@@ -46,4 +47,7 @@ public interface PubQueueProvider extends Closeable {
*/
Set<String> getQueueNames(String pubAgentName);
+ @Nonnull
+ PackageQueuedNotifier getQueuedNotifier();
+
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/queue/QueuedCallback.java b/src/main/java/org/apache/sling/distribution/journal/queue/QueuedCallback.java
new file mode 100644
index 0000000..0388f8a
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/QueuedCallback.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.queue;
+
+import java.util.List;
+
+import org.apache.sling.distribution.journal.FullMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+
+public interface QueuedCallback {
+
+ void queued(List<FullMessage<PackageMessage>> messages);
+
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCache.java
index 66474b5..09615e7 100644
--- a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCache.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCache.java
@@ -36,16 +36,14 @@ import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
-import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.queue.CacheCallback;
import org.apache.sling.distribution.journal.queue.OffsetQueue;
import org.apache.sling.distribution.journal.queue.QueueItemFactory;
+import org.apache.sling.distribution.journal.queue.QueuedCallback;
import org.apache.sling.distribution.journal.shared.JMXRegistration;
import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.osgi.service.event.Event;
-import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.sling.distribution.journal.FullMessage;
@@ -98,14 +96,14 @@ public class PubQueueCache {
private final Set<JMXRegistration> jmxRegs = new HashSet<>();
- private final EventAdmin eventAdmin;
+ private final QueuedCallback queuedCallback;
private volatile Closeable tailPoller; //NOSONAR
private final CacheCallback callback;
- public PubQueueCache(EventAdmin eventAdmin, CacheCallback callback) {
- this.eventAdmin = eventAdmin;
+ public PubQueueCache(QueuedCallback queuedCallback, CacheCallback callback) {
+ this.queuedCallback = queuedCallback;
this.callback = callback;
tailPoller = callback.createConsumer(this::handlePackage);
}
@@ -218,16 +216,10 @@ public class PubQueueCache {
messages
.forEach(message -> msgs.putItem(message.getInfo().getOffset(), QueueItemFactory.fromPackage(message)));
getOrCreateQueue(pubAgentName).putItems(msgs);
- messages.forEach(this::sendQueuedEvent);
+ queuedCallback.queued(messages);
}
- private void sendQueuedEvent(FullMessage<PackageMessage> fMessage) {
- long offset = fMessage.getInfo().getOffset();
- LOG.debug("Queueing message package-id={}, offset={}", fMessage.getMessage().getPkgId(), offset);
- PackageMessage message = fMessage.getMessage();
- final Event queuedEvent = DistributionEvent.eventPackageQueued(message, message.getPubAgentName());
- eventAdmin.postEvent(queuedEvent);
- }
+
private OffsetQueue<DistributionQueueItem> getOrCreateQueue(String pubAgentName) {
// atomically create a new queue for
diff --git a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
index 6ee5c31..a71acf3 100644
--- a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
@@ -32,8 +32,10 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.impl.publisher.PackageQueuedNotifier;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
import org.apache.sling.distribution.journal.queue.CacheCallback;
@@ -59,7 +61,7 @@ public class PubQueueProviderImpl implements PubQueueProvider, Runnable {
private static final Logger LOG = LoggerFactory.getLogger(PubQueueProviderImpl.class);
- private final EventAdmin eventAdmin;
+ private final PackageQueuedNotifier queuedNotifier;
private final CacheCallback callback;
@@ -75,7 +77,7 @@ public class PubQueueProviderImpl implements PubQueueProvider, Runnable {
private ServiceRegistration<?> reg;
public PubQueueProviderImpl(EventAdmin eventAdmin, QueueErrors queueErrors, CacheCallback callback, BundleContext context) {
- this.eventAdmin = eventAdmin;
+ queuedNotifier = new PackageQueuedNotifier(eventAdmin);
this.queueErrors = queueErrors;
this.callback = callback;
cache = newCache();
@@ -105,6 +107,7 @@ public class PubQueueProviderImpl implements PubQueueProvider, Runnable {
LOG.info(e.getMessage(), e);
}
}
+ IOUtils.closeQuietly(queuedNotifier);
LOG.info("Stopped Publisher queue provider service");
}
@@ -150,6 +153,12 @@ public class PubQueueProviderImpl implements PubQueueProvider, Runnable {
return queueNames;
}
+ @Nonnull
+ @Override
+ public PackageQueuedNotifier getQueuedNotifier() {
+ return queuedNotifier;
+ }
+
@Nullable
@Override
public DistributionQueue getQueue(String pubAgentName, String queueName) {
@@ -207,7 +216,7 @@ public class PubQueueProviderImpl implements PubQueueProvider, Runnable {
}
private PubQueueCache newCache() {
- return new PubQueueCache(eventAdmin, callback);
+ return new PubQueueCache(queuedNotifier, callback);
}
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index 7d3e980..d9d8068 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -163,6 +163,7 @@ public class DistributionPublisherTest {
when(messagingProvider.<PackageMessage>createSender(Mockito.anyString())).thenReturn(sender);
publisher.activate(config, context);
when(timer.time()).thenReturn(timerContext);
+ when(pubQueueProvider.getQueuedNotifier()).thenReturn(queuedNotifier);
}
@After
@@ -263,7 +264,7 @@ public class DistributionPublisherTest {
private void executeAndCheck(DistributionRequest request) throws IOException, DistributionException {
PackageMessage pkg = mockPackage(request);
when(factory.create(Matchers.any(DistributionPackageBuilder.class),Mockito.eq(resourceResolver), anyString(), Mockito.eq(request))).thenReturn(pkg);
- CompletableFuture<Void> callback = CompletableFuture.completedFuture(null);
+ CompletableFuture<Long> callback = CompletableFuture.completedFuture(-1L);
when(queuedNotifier.registerWait(Mockito.eq(pkg.getPkgId()))).thenReturn(callback);
when(distributionMetricsService.getExportedPackageSize()).thenReturn(histogram);
when(distributionMetricsService.getAcceptedRequests()).thenReturn(meter);
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifierTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifierTest.java
index 092402e..fa95574 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifierTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifierTest.java
@@ -18,47 +18,60 @@
*/
package org.apache.sling.distribution.journal.impl.publisher;
-import static org.apache.sling.distribution.event.DistributionEventTopics.AGENT_PACKAGE_QUEUED;
+import static java.lang.System.currentTimeMillis;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
+import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
-import org.osgi.service.event.Event;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.osgi.service.event.EventAdmin;
public class PackageQueuedNotifierTest {
private static final String PUB_AGENT_ID = "agent1";
private PackageQueuedNotifier notifier;
+ @Mock
+ private EventAdmin eventAdmin;
+
+ @Before
+ public void before() {
+ MockitoAnnotations.initMocks(this);
+ }
+
@Test
public void test() throws InterruptedException, ExecutionException, TimeoutException {
- notifier = new PackageQueuedNotifier();
- CompletableFuture<Void> arrived = notifier.registerWait("1");
- notifier.handleEvent(DistributionEvent.eventPackageQueued(pkgMsg("2"), PUB_AGENT_ID));
+ notifier = new PackageQueuedNotifier(eventAdmin);
+ CompletableFuture<Long> arrived = notifier.registerWait("1");
+ notifier.queued(singletonList(fullPkgMsg("2", 0)));
try {
arrived.get(100,TimeUnit.MILLISECONDS);
Assert.fail("Expected TimeoutException");
} catch (TimeoutException e) {
// Expected
}
- notifier.handleEvent(DistributionEvent.eventPackageQueued(pkgMsg("1"), PUB_AGENT_ID));
+ notifier.queued(singletonList(fullPkgMsg("1", 1)));
arrived.get(1, TimeUnit.SECONDS);
}
@Test
public void testForNullPackage() throws InterruptedException, ExecutionException, TimeoutException {
- notifier = new PackageQueuedNotifier();
- CompletableFuture<Void> arrived = notifier.registerWait("1");
- notifier.handleEvent(new Event(AGENT_PACKAGE_QUEUED, new HashMap<>()));
+ notifier = new PackageQueuedNotifier(eventAdmin);
+ CompletableFuture<Long> arrived = notifier.registerWait("1");
+ notifier.queued(emptyList());
try {
arrived.get(100,TimeUnit.MILLISECONDS);
Assert.fail("Expected TimeoutException as package ID is null");
@@ -67,6 +80,10 @@ public class PackageQueuedNotifierTest {
}
}
+ private FullMessage<PackageMessage> fullPkgMsg(String pkgId, long offset) {
+ return new FullMessage<>(new TestMessageInfo("topic_package", 0, offset, currentTimeMillis()), pkgMsg(pkgId));
+ }
+
private PackageMessage pkgMsg(String packageId) {
return PackageMessage.builder()
.paths(Arrays.asList("/test"))
diff --git a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCacheTest.java b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCacheTest.java
index 266d54d..4e2fed3 100644
--- a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCacheTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCacheTest.java
@@ -44,6 +44,7 @@ import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
import org.apache.sling.distribution.journal.queue.CacheCallback;
import org.apache.sling.distribution.journal.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.queue.QueuedCallback;
import org.apache.sling.distribution.journal.shared.TestMessageInfo;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.junit.After;
@@ -55,7 +56,6 @@ import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
-import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,7 +78,7 @@ public class PubQueueCacheTest {
private ArgumentCaptor<MessageHandler<PackageMessage>> handlerCaptor;
@Mock
- private EventAdmin eventAdmin;
+ private QueuedCallback queuedCallback;
@Mock
private CacheCallback callback;
@@ -101,7 +101,7 @@ public class PubQueueCacheTest {
when(callback.createConsumer(handlerCaptor.capture()))
.thenReturn(poller);
- cache = new PubQueueCache(eventAdmin, callback);
+ cache = new PubQueueCache(queuedCallback, callback);
executor = Executors.newFixedThreadPool(10);
tailHandler = handlerCaptor.getValue();
}