You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2021/07/12 18:33:03 UTC

[sling-org-apache-sling-distribution-journal] branch master updated: SLING-10564 - Log package offset in the Distribution Publisher (#79)

This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new d801dbf  SLING-10564 - Log package offset in the Distribution Publisher (#79)
d801dbf is described below

commit d801dbf749b12c07e8734449293d6a115e7f39cc
Author: Timothee Maret <tm...@apache.org>
AuthorDate: Mon Jul 12 20:32:52 2021 +0200

    SLING-10564 - Log package offset in the Distribution Publisher (#79)
    
    * Handle queued notifications via a callback
    * Extend queue notifier to return the offset
---
 .../impl/publisher/DistributionPublisher.java      | 35 +++++-----
 .../impl/publisher/PackageQueuedNotifier.java      | 79 +++++++++++++---------
 .../journal/queue/PubQueueProvider.java            |  4 ++
 .../distribution/journal/queue/QueuedCallback.java | 30 ++++++++
 .../journal/queue/impl/PubQueueCache.java          | 20 ++----
 .../journal/queue/impl/PubQueueProviderImpl.java   | 15 +++-
 .../impl/publisher/DistributionPublisherTest.java  |  3 +-
 .../impl/publisher/PackageQueuedNotifierTest.java  | 39 ++++++++---
 .../journal/queue/impl/PubQueueCacheTest.java      |  6 +-
 9 files changed, 150 insertions(+), 81 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index 56a8817..e4adb8a 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -29,12 +29,12 @@ import static org.apache.sling.distribution.journal.shared.DistributionMetricsSe
 import java.io.Closeable;
 import java.util.Collections;
 import java.util.Dictionary;
-import java.util.HashMap;
+import java.util.EnumMap;
 import java.util.Hashtable;
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
+import java.util.function.ToLongFunction;
 
 import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
@@ -91,7 +91,7 @@ public class DistributionPublisher implements DistributionAgent {
 
     public static final String FACTORY_PID = "org.apache.sling.distribution.journal.impl.publisher.DistributionPublisherFactory";
 
-    private final Map<DistributionRequestType, Consumer<PackageMessage>> REQ_TYPES = new HashMap<>();
+    private final EnumMap<DistributionRequestType, ToLongFunction<PackageMessage>> reqTypes = new EnumMap<>(DistributionRequestType.class);
 
     private final DefaultDistributionLog log;
 
@@ -102,9 +102,6 @@ public class DistributionPublisher implements DistributionAgent {
     private DistributionPackageBuilder packageBuilder;
 
     @Reference
-    private PackageQueuedNotifier queuedNotifier;
-
-    @Reference
     private DiscoveryService discoveryService;
 
     @Reference
@@ -146,9 +143,9 @@ public class DistributionPublisher implements DistributionAgent {
 
     public DistributionPublisher() {
         log = new DefaultDistributionLog(pubAgentName, this.getClass(), DefaultDistributionLog.LogLevel.INFO);
-        REQ_TYPES.put(ADD,    this::sendAndWait);
-        REQ_TYPES.put(DELETE, this::sendAndWait);
-        REQ_TYPES.put(TEST,   this::send);
+        reqTypes.put(ADD,    this::sendAndWait);
+        reqTypes.put(DELETE, this::sendAndWait);
+        reqTypes.put(TEST,   this::send);
     }
 
     @Activate
@@ -253,7 +250,7 @@ public class DistributionPublisher implements DistributionAgent {
     public DistributionResponse execute(ResourceResolver resourceResolver,
                                         DistributionRequest request)
             throws DistributionException {
-        Consumer<PackageMessage> handler = REQ_TYPES.get(request.getRequestType());
+        ToLongFunction<PackageMessage> handler = reqTypes.get(request.getRequestType());
         if (handler != null) {
             return execute(resourceResolver, request, handler);
         } else {
@@ -263,7 +260,7 @@ public class DistributionPublisher implements DistributionAgent {
 
     private DistributionResponse execute(ResourceResolver resourceResolver,
                                          DistributionRequest request,
-                                         Consumer<PackageMessage> sender)
+                                         ToLongFunction<PackageMessage> sender)
             throws DistributionException {
         final PackageMessage pkg;
         try {
@@ -275,10 +272,10 @@ public class DistributionPublisher implements DistributionAgent {
         }
 
         try {
-            timed(distributionMetricsService.getEnqueuePackageDuration(), () -> sender.accept(pkg));
+            long offset = timed(distributionMetricsService.getEnqueuePackageDuration(), () -> sender.applyAsLong(pkg));
             distributionMetricsService.getExportedPackageSize().update(pkg.getPkgLength());
             distributionMetricsService.getAcceptedRequests().mark();
-            String msg = String.format("Request accepted with distribution package %s", pkg);
+            String msg = String.format("Request accepted with distribution package %s at offset=%s", pkg, offset);
             log.info(msg);
             return new SimpleDistributionResponse(ACCEPTED, msg);
         } catch (Throwable e) {
@@ -293,17 +290,19 @@ public class DistributionPublisher implements DistributionAgent {
         }
     }
     
-    private void send(PackageMessage pkg) {
+    private long send(PackageMessage pkg) {
         sender.accept(pkg);
+        return -1;
     }
 
-    private void sendAndWait(PackageMessage pkg) {
+    private long sendAndWait(PackageMessage pkg) {
+        PackageQueuedNotifier queuedNotifier = pubQueueProvider.getQueuedNotifier();
         try {
-            CompletableFuture<Void> received = queuedNotifier.registerWait(pkg.getPkgId());
+            CompletableFuture<Long> received = queuedNotifier.registerWait(pkg.getPkgId());
             Event createdEvent = DistributionEvent.eventPackageCreated(pkg, pubAgentName);
             eventAdmin.postEvent(createdEvent);
             sender.accept(pkg);
-            received.get(queuedTimeout, TimeUnit.MILLISECONDS);
+            return received.get(queuedTimeout, TimeUnit.MILLISECONDS);
         } catch (Exception e) {
             queuedNotifier.unRegisterWait(pkg.getPkgId());
             throw new RuntimeException(e);
@@ -313,7 +312,7 @@ public class DistributionPublisher implements DistributionAgent {
     @Nonnull
     private DistributionResponse executeUnsupported(DistributionRequest request) {
         String msg = String.format("Request requestType=%s not supported by this agent, expected one of %s",
-                request.getRequestType(), REQ_TYPES.keySet());
+                request.getRequestType(), reqTypes.keySet());
         log.info(msg);
         return new SimpleDistributionResponse(DistributionRequestState.DROPPED, msg);
     }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifier.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifier.java
index b51b252..c6747b5 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifier.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifier.java
@@ -18,63 +18,53 @@
  */
 package org.apache.sling.distribution.journal.impl.publisher;
 
+import java.io.Closeable;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
-import org.apache.sling.distribution.event.DistributionEventTopics;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.queue.QueuedCallback;
 import org.osgi.service.event.Event;
-import org.osgi.service.event.EventConstants;
-import org.osgi.service.event.EventHandler;
 
+import org.osgi.service.event.EventAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@Component(
-        service = {PackageQueuedNotifier.class, EventHandler.class},
-        property = EventConstants.EVENT_TOPIC + "=" + DistributionEventTopics.AGENT_PACKAGE_QUEUED
-)
-public class PackageQueuedNotifier implements EventHandler {
+import static java.util.Objects.requireNonNull;
+
+public class PackageQueuedNotifier implements QueuedCallback, Closeable {
 
     private static final Logger LOG = LoggerFactory.getLogger(PackageQueuedNotifier.class);
 
+    private final EventAdmin eventAdmin;
+
     /**
      * (packageId x Future)
      */
-    private final Map<String, CompletableFuture<Void>> receiveCallbacks;
+    private final Map<String, CompletableFuture<Long>> receiveCallbacks;
     
-    public PackageQueuedNotifier() {
+    public PackageQueuedNotifier(EventAdmin eventAdmin) {
         this.receiveCallbacks = new ConcurrentHashMap<>();
+        this.eventAdmin = requireNonNull(eventAdmin);
     }
 
-    @Deactivate
-    public void deactivate() {
-        receiveCallbacks.forEach((packageId, callback) -> {
-            LOG.debug("Cancel wait condition for distribution package with pkgId={}", packageId);
-            callback.cancel(true);
-        });
-        LOG.info("Package queue notifier service stopped");
-    }
-
-    @Override
-    public void handleEvent(Event event) {
-        String packageId = (String) event.getProperty(DistributionEvent.PACKAGE_ID);
-        LOG.debug("Handling event for pkgId={}", packageId);
-        CompletableFuture<Void> callback = null;
-        if (packageId != null) {
-            callback = receiveCallbacks.remove(packageId);
+    private void notifyWait(String pkgId, long offset) {
+        CompletableFuture<Long> callback = null;
+        if (pkgId != null) {
+            callback = receiveCallbacks.remove(pkgId);
         }
         if (callback != null) {
-            callback.complete(null);
+            callback.complete(offset);
         }
     }
 
-    public CompletableFuture<Void> registerWait(String packageId) {
+    public CompletableFuture<Long> registerWait(String packageId) {
         LOG.debug("Registering wait condition for pkgId={}", packageId);
-        CompletableFuture<Void> packageReceived = new CompletableFuture<>();
+        CompletableFuture<Long> packageReceived = new CompletableFuture<>();
         receiveCallbacks.put(packageId, packageReceived);
         return packageReceived;
     }
@@ -83,4 +73,31 @@ public class PackageQueuedNotifier implements EventHandler {
         LOG.debug("Un-registering wait condition for pkgId={}", packageId);
         receiveCallbacks.remove(packageId);
     }
+
+    @Override
+    public void queued(List<FullMessage<PackageMessage>> fullMessages) {
+        fullMessages.forEach(this::queued);
+    }
+
+    private void queued(FullMessage<PackageMessage> fullMessage) {
+        long offset = fullMessage.getInfo().getOffset();
+        PackageMessage message = fullMessage.getMessage();
+        LOG.debug("Queued package {} at offset={}", message, offset);
+        sendQueuedEvent(message);
+        notifyWait(message.getPkgId(), offset);
+    }
+
+    private void sendQueuedEvent(PackageMessage message) {
+        Event queuedEvent = DistributionEvent.eventPackageQueued(message, message.getPubAgentName());
+        eventAdmin.postEvent(queuedEvent);
+    }
+
+    @Override
+    public void close() {
+        receiveCallbacks.forEach((packageId, callback) -> {
+            LOG.debug("Cancel wait condition for distribution package with pkgId={}", packageId);
+            callback.cancel(true);
+        });
+        LOG.info("Package queue notifier closed");
+    }
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/queue/PubQueueProvider.java b/src/main/java/org/apache/sling/distribution/journal/queue/PubQueueProvider.java
index 6038afb..cdac7de 100644
--- a/src/main/java/org/apache/sling/distribution/journal/queue/PubQueueProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/PubQueueProvider.java
@@ -26,6 +26,7 @@ import javax.annotation.Nullable;
 import javax.annotation.ParametersAreNonnullByDefault;
 
 import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.impl.publisher.PackageQueuedNotifier;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
@@ -46,4 +47,7 @@ public interface PubQueueProvider extends Closeable {
      */
     Set<String> getQueueNames(String pubAgentName);
 
+    @Nonnull
+    PackageQueuedNotifier getQueuedNotifier();
+
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/queue/QueuedCallback.java b/src/main/java/org/apache/sling/distribution/journal/queue/QueuedCallback.java
new file mode 100644
index 0000000..0388f8a
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/QueuedCallback.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.queue;
+
+import java.util.List;
+
+import org.apache.sling.distribution.journal.FullMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+
+public interface QueuedCallback {
+
+    void queued(List<FullMessage<PackageMessage>> messages);
+
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCache.java
index 66474b5..09615e7 100644
--- a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCache.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCache.java
@@ -36,16 +36,14 @@ import java.util.concurrent.locks.ReentrantLock;
 import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 
-import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
 import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.queue.CacheCallback;
 import org.apache.sling.distribution.journal.queue.OffsetQueue;
 import org.apache.sling.distribution.journal.queue.QueueItemFactory;
+import org.apache.sling.distribution.journal.queue.QueuedCallback;
 import org.apache.sling.distribution.journal.shared.JMXRegistration;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.osgi.service.event.Event;
-import org.osgi.service.event.EventAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.sling.distribution.journal.FullMessage;
@@ -98,14 +96,14 @@ public class PubQueueCache {
 
     private final Set<JMXRegistration> jmxRegs = new HashSet<>();
 
-    private final EventAdmin eventAdmin;
+    private final QueuedCallback queuedCallback;
 
     private volatile Closeable tailPoller; //NOSONAR
 
     private final CacheCallback callback;
     
-    public PubQueueCache(EventAdmin eventAdmin, CacheCallback callback) {
-        this.eventAdmin = eventAdmin;
+    public PubQueueCache(QueuedCallback queuedCallback, CacheCallback callback) {
+        this.queuedCallback = queuedCallback;
         this.callback = callback;
         tailPoller = callback.createConsumer(this::handlePackage);
     }
@@ -218,16 +216,10 @@ public class PubQueueCache {
         messages
             .forEach(message -> msgs.putItem(message.getInfo().getOffset(), QueueItemFactory.fromPackage(message)));
         getOrCreateQueue(pubAgentName).putItems(msgs);
-        messages.forEach(this::sendQueuedEvent);
+        queuedCallback.queued(messages);
     }
 
-    private void sendQueuedEvent(FullMessage<PackageMessage> fMessage) {
-        long offset = fMessage.getInfo().getOffset();
-        LOG.debug("Queueing message package-id={}, offset={}", fMessage.getMessage().getPkgId(), offset);
-        PackageMessage message = fMessage.getMessage();
-        final Event queuedEvent = DistributionEvent.eventPackageQueued(message, message.getPubAgentName());
-        eventAdmin.postEvent(queuedEvent);
-    }
+
 
     private OffsetQueue<DistributionQueueItem> getOrCreateQueue(String pubAgentName) {
         // atomically create a new queue for
diff --git a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
index 6ee5c31..a71acf3 100644
--- a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
@@ -32,8 +32,10 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.ParametersAreNonnullByDefault;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.impl.publisher.PackageQueuedNotifier;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
 import org.apache.sling.distribution.journal.queue.CacheCallback;
@@ -59,7 +61,7 @@ public class PubQueueProviderImpl implements PubQueueProvider, Runnable {
 
     private static final Logger LOG = LoggerFactory.getLogger(PubQueueProviderImpl.class);
     
-    private final EventAdmin eventAdmin;
+    private final PackageQueuedNotifier queuedNotifier;
 
     private final CacheCallback callback;
     
@@ -75,7 +77,7 @@ public class PubQueueProviderImpl implements PubQueueProvider, Runnable {
     private ServiceRegistration<?> reg;
 
     public PubQueueProviderImpl(EventAdmin eventAdmin, QueueErrors queueErrors, CacheCallback callback, BundleContext context) {
-        this.eventAdmin = eventAdmin;
+        queuedNotifier = new PackageQueuedNotifier(eventAdmin);
         this.queueErrors = queueErrors;
         this.callback = callback;
         cache = newCache();
@@ -105,6 +107,7 @@ public class PubQueueProviderImpl implements PubQueueProvider, Runnable {
                 LOG.info(e.getMessage(), e);
             }
         }
+        IOUtils.closeQuietly(queuedNotifier);
         LOG.info("Stopped Publisher queue provider service");
     }
     
@@ -150,6 +153,12 @@ public class PubQueueProviderImpl implements PubQueueProvider, Runnable {
         return queueNames;
     }
 
+    @Nonnull
+    @Override
+    public PackageQueuedNotifier getQueuedNotifier() {
+        return queuedNotifier;
+    }
+
     @Nullable
     @Override
     public DistributionQueue getQueue(String pubAgentName, String queueName) {
@@ -207,7 +216,7 @@ public class PubQueueProviderImpl implements PubQueueProvider, Runnable {
     }
 
     private PubQueueCache newCache() {
-        return new PubQueueCache(eventAdmin, callback);
+        return new PubQueueCache(queuedNotifier, callback);
     }
 
 }
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index 7d3e980..d9d8068 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -163,6 +163,7 @@ public class DistributionPublisherTest {
         when(messagingProvider.<PackageMessage>createSender(Mockito.anyString())).thenReturn(sender);
         publisher.activate(config, context);
         when(timer.time()).thenReturn(timerContext);
+        when(pubQueueProvider.getQueuedNotifier()).thenReturn(queuedNotifier);
     }
     
     @After
@@ -263,7 +264,7 @@ public class DistributionPublisherTest {
     private void executeAndCheck(DistributionRequest request) throws IOException, DistributionException {
         PackageMessage pkg = mockPackage(request);
         when(factory.create(Matchers.any(DistributionPackageBuilder.class),Mockito.eq(resourceResolver), anyString(), Mockito.eq(request))).thenReturn(pkg);
-        CompletableFuture<Void> callback = CompletableFuture.completedFuture(null);
+        CompletableFuture<Long> callback = CompletableFuture.completedFuture(-1L);
         when(queuedNotifier.registerWait(Mockito.eq(pkg.getPkgId()))).thenReturn(callback);
         when(distributionMetricsService.getExportedPackageSize()).thenReturn(histogram);
         when(distributionMetricsService.getAcceptedRequests()).thenReturn(meter);
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifierTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifierTest.java
index 092402e..fa95574 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifierTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifierTest.java
@@ -18,47 +18,60 @@
  */
 package org.apache.sling.distribution.journal.impl.publisher;
 
-import static org.apache.sling.distribution.event.DistributionEventTopics.AGENT_PACKAGE_QUEUED;
+import static java.lang.System.currentTimeMillis;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
 
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
+import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
-import org.osgi.service.event.Event;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.osgi.service.event.EventAdmin;
 
 public class PackageQueuedNotifierTest {
 
     private static final String PUB_AGENT_ID = "agent1";
     private PackageQueuedNotifier notifier;
 
+    @Mock
+    private EventAdmin eventAdmin;
+
+    @Before
+    public void before() {
+        MockitoAnnotations.initMocks(this);
+    }
+
     @Test
     public void test() throws InterruptedException, ExecutionException, TimeoutException {
-        notifier = new PackageQueuedNotifier();
-        CompletableFuture<Void> arrived = notifier.registerWait("1");
-        notifier.handleEvent(DistributionEvent.eventPackageQueued(pkgMsg("2"), PUB_AGENT_ID));
+        notifier = new PackageQueuedNotifier(eventAdmin);
+        CompletableFuture<Long> arrived = notifier.registerWait("1");
+        notifier.queued(singletonList(fullPkgMsg("2", 0)));
         try {
             arrived.get(100,TimeUnit.MILLISECONDS);
             Assert.fail("Expected TimeoutException");
         } catch (TimeoutException e) {
             // Expected
         }
-        notifier.handleEvent(DistributionEvent.eventPackageQueued(pkgMsg("1"), PUB_AGENT_ID));
+        notifier.queued(singletonList(fullPkgMsg("1", 1)));
         arrived.get(1, TimeUnit.SECONDS);
     }
 
     @Test
     public void testForNullPackage() throws InterruptedException, ExecutionException, TimeoutException {
-        notifier = new PackageQueuedNotifier();
-        CompletableFuture<Void> arrived = notifier.registerWait("1");
-        notifier.handleEvent(new Event(AGENT_PACKAGE_QUEUED, new HashMap<>()));
+        notifier = new PackageQueuedNotifier(eventAdmin);
+        CompletableFuture<Long> arrived = notifier.registerWait("1");
+        notifier.queued(emptyList());
         try {
             arrived.get(100,TimeUnit.MILLISECONDS);
             Assert.fail("Expected TimeoutException as package ID is null");
@@ -67,6 +80,10 @@ public class PackageQueuedNotifierTest {
         }
     }
 
+    private FullMessage<PackageMessage> fullPkgMsg(String pkgId, long offset) {
+        return new FullMessage<>(new TestMessageInfo("topic_package", 0, offset, currentTimeMillis()), pkgMsg(pkgId));
+    }
+
     private PackageMessage pkgMsg(String packageId) {
         return PackageMessage.builder()
             .paths(Arrays.asList("/test"))
diff --git a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCacheTest.java b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCacheTest.java
index 266d54d..4e2fed3 100644
--- a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCacheTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCacheTest.java
@@ -44,6 +44,7 @@ import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
 import org.apache.sling.distribution.journal.queue.CacheCallback;
 import org.apache.sling.distribution.journal.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.queue.QueuedCallback;
 import org.apache.sling.distribution.journal.shared.TestMessageInfo;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.junit.After;
@@ -55,7 +56,6 @@ import org.mockito.Captor;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.runners.MockitoJUnitRunner;
-import org.osgi.service.event.EventAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,7 +78,7 @@ public class PubQueueCacheTest {
     private ArgumentCaptor<MessageHandler<PackageMessage>> handlerCaptor;
 
     @Mock
-    private EventAdmin eventAdmin;
+    private QueuedCallback queuedCallback;
 
     @Mock
     private CacheCallback callback;
@@ -101,7 +101,7 @@ public class PubQueueCacheTest {
         when(callback.createConsumer(handlerCaptor.capture()))
                 .thenReturn(poller);
 
-        cache = new PubQueueCache(eventAdmin, callback);
+        cache = new PubQueueCache(queuedCallback, callback);
         executor = Executors.newFixedThreadPool(10);
         tailHandler = handlerCaptor.getValue();
     }