You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2021/07/08 20:37:56 UTC

[sling-org-apache-sling-distribution-journal] branch SLING-10564 created (now 40ebdc9)

This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a change to branch SLING-10564
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git.


      at 40ebdc9  SLING-10564 - Handle queued notifications via a callback

This branch includes the following new commits:

     new e87e807  SLING-10564 - Extend queue notifier to return a long
     new 40ebdc9  SLING-10564 - Handle queued notifications via a callback

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[sling-org-apache-sling-distribution-journal] 02/02: SLING-10564 - Handle queued notifications via a callback

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch SLING-10564
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit 40ebdc99373ae658ad02d48ba8c782686911eab2
Author: tmaret <tm...@adobe.com>
AuthorDate: Thu Jul 8 22:37:17 2021 +0200

    SLING-10564 - Handle queued notifications via a callback
---
 .../impl/publisher/DistributionPublisher.java      |  4 +-
 .../impl/publisher/PackageQueuedNotifier.java      | 70 +++++++++++++---------
 .../journal/queue/PubQueueProvider.java            |  4 ++
 .../distribution/journal/queue/QueuedCallback.java | 30 ++++++++++
 .../journal/queue/impl/PubQueueCache.java          | 20 ++-----
 .../journal/queue/impl/PubQueueProviderImpl.java   | 15 ++++-
 .../impl/publisher/DistributionPublisherTest.java  |  1 +
 .../impl/publisher/PackageQueuedNotifierTest.java  | 35 ++++++++---
 .../journal/queue/impl/PubQueueCacheTest.java      |  6 +-
 9 files changed, 126 insertions(+), 59 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index 12a75cf..0e4ccbc 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -103,9 +103,6 @@ public class DistributionPublisher implements DistributionAgent {
     private DistributionPackageBuilder packageBuilder;
 
     @Reference
-    private PackageQueuedNotifier queuedNotifier;
-
-    @Reference
     private DiscoveryService discoveryService;
 
     @Reference
@@ -300,6 +297,7 @@ public class DistributionPublisher implements DistributionAgent {
     }
 
     private long sendAndWait(PackageMessage pkg) {
+        PackageQueuedNotifier queuedNotifier = pubQueueProvider.getQueuedNotifier();
         try {
             CompletableFuture<Long> received = queuedNotifier.registerWait(pkg.getPkgId());
             Event createdEvent = DistributionEvent.eventPackageCreated(pkg, pubAgentName);
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifier.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifier.java
index 6862db3..c6747b5 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifier.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifier.java
@@ -18,55 +18,44 @@
  */
 package org.apache.sling.distribution.journal.impl.publisher;
 
+import java.io.Closeable;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
-import org.apache.sling.distribution.event.DistributionEventTopics;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.queue.QueuedCallback;
 import org.osgi.service.event.Event;
-import org.osgi.service.event.EventConstants;
-import org.osgi.service.event.EventHandler;
 
+import org.osgi.service.event.EventAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@Component(
-        service = {PackageQueuedNotifier.class, EventHandler.class},
-        property = EventConstants.EVENT_TOPIC + "=" + DistributionEventTopics.AGENT_PACKAGE_QUEUED
-)
-public class PackageQueuedNotifier implements EventHandler {
+import static java.util.Objects.requireNonNull;
+
+public class PackageQueuedNotifier implements QueuedCallback, Closeable {
 
     private static final Logger LOG = LoggerFactory.getLogger(PackageQueuedNotifier.class);
 
+    private final EventAdmin eventAdmin;
+
     /**
      * (packageId x Future)
      */
     private final Map<String, CompletableFuture<Long>> receiveCallbacks;
     
-    public PackageQueuedNotifier() {
+    public PackageQueuedNotifier(EventAdmin eventAdmin) {
         this.receiveCallbacks = new ConcurrentHashMap<>();
+        this.eventAdmin = requireNonNull(eventAdmin);
     }
 
-    @Deactivate
-    public void deactivate() {
-        receiveCallbacks.forEach((packageId, callback) -> {
-            LOG.debug("Cancel wait condition for distribution package with pkgId={}", packageId);
-            callback.cancel(true);
-        });
-        LOG.info("Package queue notifier service stopped");
-    }
-
-    @Override
-    public void handleEvent(Event event) {
-        String packageId = (String) event.getProperty(DistributionEvent.PACKAGE_ID);
-        long offset = -1; // TODO get the offset from event or from the queue cache
-        LOG.debug("Handling event for pkgId={}", packageId);
+    private void notifyWait(String pkgId, long offset) {
         CompletableFuture<Long> callback = null;
-        if (packageId != null) {
-            callback = receiveCallbacks.remove(packageId);
+        if (pkgId != null) {
+            callback = receiveCallbacks.remove(pkgId);
         }
         if (callback != null) {
             callback.complete(offset);
@@ -84,4 +73,31 @@ public class PackageQueuedNotifier implements EventHandler {
         LOG.debug("Un-registering wait condition for pkgId={}", packageId);
         receiveCallbacks.remove(packageId);
     }
+
+    @Override
+    public void queued(List<FullMessage<PackageMessage>> fullMessages) {
+        fullMessages.forEach(this::queued);
+    }
+
+    private void queued(FullMessage<PackageMessage> fullMessage) {
+        long offset = fullMessage.getInfo().getOffset();
+        PackageMessage message = fullMessage.getMessage();
+        LOG.debug("Queued package {} at offset={}", message, offset);
+        sendQueuedEvent(message);
+        notifyWait(message.getPkgId(), offset);
+    }
+
+    private void sendQueuedEvent(PackageMessage message) {
+        Event queuedEvent = DistributionEvent.eventPackageQueued(message, message.getPubAgentName());
+        eventAdmin.postEvent(queuedEvent);
+    }
+
+    @Override
+    public void close() {
+        receiveCallbacks.forEach((packageId, callback) -> {
+            LOG.debug("Cancel wait condition for distribution package with pkgId={}", packageId);
+            callback.cancel(true);
+        });
+        LOG.info("Package queue notifier closed");
+    }
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/queue/PubQueueProvider.java b/src/main/java/org/apache/sling/distribution/journal/queue/PubQueueProvider.java
index 6038afb..cdac7de 100644
--- a/src/main/java/org/apache/sling/distribution/journal/queue/PubQueueProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/PubQueueProvider.java
@@ -26,6 +26,7 @@ import javax.annotation.Nullable;
 import javax.annotation.ParametersAreNonnullByDefault;
 
 import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.impl.publisher.PackageQueuedNotifier;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
@@ -46,4 +47,7 @@ public interface PubQueueProvider extends Closeable {
      */
     Set<String> getQueueNames(String pubAgentName);
 
+    @Nonnull
+    PackageQueuedNotifier getQueuedNotifier();
+
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/queue/QueuedCallback.java b/src/main/java/org/apache/sling/distribution/journal/queue/QueuedCallback.java
new file mode 100644
index 0000000..0388f8a
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/QueuedCallback.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.queue;
+
+import java.util.List;
+
+import org.apache.sling.distribution.journal.FullMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+
+public interface QueuedCallback {
+
+    void queued(List<FullMessage<PackageMessage>> messages);
+
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCache.java
index 66474b5..09615e7 100644
--- a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCache.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCache.java
@@ -36,16 +36,14 @@ import java.util.concurrent.locks.ReentrantLock;
 import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 
-import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
 import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.queue.CacheCallback;
 import org.apache.sling.distribution.journal.queue.OffsetQueue;
 import org.apache.sling.distribution.journal.queue.QueueItemFactory;
+import org.apache.sling.distribution.journal.queue.QueuedCallback;
 import org.apache.sling.distribution.journal.shared.JMXRegistration;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.osgi.service.event.Event;
-import org.osgi.service.event.EventAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.sling.distribution.journal.FullMessage;
@@ -98,14 +96,14 @@ public class PubQueueCache {
 
     private final Set<JMXRegistration> jmxRegs = new HashSet<>();
 
-    private final EventAdmin eventAdmin;
+    private final QueuedCallback queuedCallback;
 
     private volatile Closeable tailPoller; //NOSONAR
 
     private final CacheCallback callback;
     
-    public PubQueueCache(EventAdmin eventAdmin, CacheCallback callback) {
-        this.eventAdmin = eventAdmin;
+    public PubQueueCache(QueuedCallback queuedCallback, CacheCallback callback) {
+        this.queuedCallback = queuedCallback;
         this.callback = callback;
         tailPoller = callback.createConsumer(this::handlePackage);
     }
@@ -218,16 +216,10 @@ public class PubQueueCache {
         messages
             .forEach(message -> msgs.putItem(message.getInfo().getOffset(), QueueItemFactory.fromPackage(message)));
         getOrCreateQueue(pubAgentName).putItems(msgs);
-        messages.forEach(this::sendQueuedEvent);
+        queuedCallback.queued(messages);
     }
 
-    private void sendQueuedEvent(FullMessage<PackageMessage> fMessage) {
-        long offset = fMessage.getInfo().getOffset();
-        LOG.debug("Queueing message package-id={}, offset={}", fMessage.getMessage().getPkgId(), offset);
-        PackageMessage message = fMessage.getMessage();
-        final Event queuedEvent = DistributionEvent.eventPackageQueued(message, message.getPubAgentName());
-        eventAdmin.postEvent(queuedEvent);
-    }
+
 
     private OffsetQueue<DistributionQueueItem> getOrCreateQueue(String pubAgentName) {
         // atomically create a new queue for
diff --git a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
index 6ee5c31..a71acf3 100644
--- a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
+++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
@@ -32,8 +32,10 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.ParametersAreNonnullByDefault;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.impl.publisher.PackageQueuedNotifier;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
 import org.apache.sling.distribution.journal.queue.CacheCallback;
@@ -59,7 +61,7 @@ public class PubQueueProviderImpl implements PubQueueProvider, Runnable {
 
     private static final Logger LOG = LoggerFactory.getLogger(PubQueueProviderImpl.class);
     
-    private final EventAdmin eventAdmin;
+    private final PackageQueuedNotifier queuedNotifier;
 
     private final CacheCallback callback;
     
@@ -75,7 +77,7 @@ public class PubQueueProviderImpl implements PubQueueProvider, Runnable {
     private ServiceRegistration<?> reg;
 
     public PubQueueProviderImpl(EventAdmin eventAdmin, QueueErrors queueErrors, CacheCallback callback, BundleContext context) {
-        this.eventAdmin = eventAdmin;
+        queuedNotifier = new PackageQueuedNotifier(eventAdmin);
         this.queueErrors = queueErrors;
         this.callback = callback;
         cache = newCache();
@@ -105,6 +107,7 @@ public class PubQueueProviderImpl implements PubQueueProvider, Runnable {
                 LOG.info(e.getMessage(), e);
             }
         }
+        IOUtils.closeQuietly(queuedNotifier);
         LOG.info("Stopped Publisher queue provider service");
     }
     
@@ -150,6 +153,12 @@ public class PubQueueProviderImpl implements PubQueueProvider, Runnable {
         return queueNames;
     }
 
+    @Nonnull
+    @Override
+    public PackageQueuedNotifier getQueuedNotifier() {
+        return queuedNotifier;
+    }
+
     @Nullable
     @Override
     public DistributionQueue getQueue(String pubAgentName, String queueName) {
@@ -207,7 +216,7 @@ public class PubQueueProviderImpl implements PubQueueProvider, Runnable {
     }
 
     private PubQueueCache newCache() {
-        return new PubQueueCache(eventAdmin, callback);
+        return new PubQueueCache(queuedNotifier, callback);
     }
 
 }
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index 980ff01..d9d8068 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -163,6 +163,7 @@ public class DistributionPublisherTest {
         when(messagingProvider.<PackageMessage>createSender(Mockito.anyString())).thenReturn(sender);
         publisher.activate(config, context);
         when(timer.time()).thenReturn(timerContext);
+        when(pubQueueProvider.getQueuedNotifier()).thenReturn(queuedNotifier);
     }
     
     @After
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifierTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifierTest.java
index 7954e3c..fa95574 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifierTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifierTest.java
@@ -18,47 +18,60 @@
  */
 package org.apache.sling.distribution.journal.impl.publisher;
 
-import static org.apache.sling.distribution.event.DistributionEventTopics.AGENT_PACKAGE_QUEUED;
+import static java.lang.System.currentTimeMillis;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
 
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
+import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.shared.TestMessageInfo;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
-import org.osgi.service.event.Event;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.osgi.service.event.EventAdmin;
 
 public class PackageQueuedNotifierTest {
 
     private static final String PUB_AGENT_ID = "agent1";
     private PackageQueuedNotifier notifier;
 
+    @Mock
+    private EventAdmin eventAdmin;
+
+    @Before
+    public void before() {
+        MockitoAnnotations.initMocks(this);
+    }
+
     @Test
     public void test() throws InterruptedException, ExecutionException, TimeoutException {
-        notifier = new PackageQueuedNotifier();
+        notifier = new PackageQueuedNotifier(eventAdmin);
         CompletableFuture<Long> arrived = notifier.registerWait("1");
-        notifier.handleEvent(DistributionEvent.eventPackageQueued(pkgMsg("2"), PUB_AGENT_ID));
+        notifier.queued(singletonList(fullPkgMsg("2", 0)));
         try {
             arrived.get(100,TimeUnit.MILLISECONDS);
             Assert.fail("Expected TimeoutException");
         } catch (TimeoutException e) {
             // Expected
         }
-        notifier.handleEvent(DistributionEvent.eventPackageQueued(pkgMsg("1"), PUB_AGENT_ID));
+        notifier.queued(singletonList(fullPkgMsg("1", 1)));
         arrived.get(1, TimeUnit.SECONDS);
     }
 
     @Test
     public void testForNullPackage() throws InterruptedException, ExecutionException, TimeoutException {
-        notifier = new PackageQueuedNotifier();
+        notifier = new PackageQueuedNotifier(eventAdmin);
         CompletableFuture<Long> arrived = notifier.registerWait("1");
-        notifier.handleEvent(new Event(AGENT_PACKAGE_QUEUED, new HashMap<>()));
+        notifier.queued(emptyList());
         try {
             arrived.get(100,TimeUnit.MILLISECONDS);
             Assert.fail("Expected TimeoutException as package ID is null");
@@ -67,6 +80,10 @@ public class PackageQueuedNotifierTest {
         }
     }
 
+    private FullMessage<PackageMessage> fullPkgMsg(String pkgId, long offset) {
+        return new FullMessage<>(new TestMessageInfo("topic_package", 0, offset, currentTimeMillis()), pkgMsg(pkgId));
+    }
+
     private PackageMessage pkgMsg(String packageId) {
         return PackageMessage.builder()
             .paths(Arrays.asList("/test"))
diff --git a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCacheTest.java b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCacheTest.java
index 266d54d..4e2fed3 100644
--- a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCacheTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCacheTest.java
@@ -44,6 +44,7 @@ import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
 import org.apache.sling.distribution.journal.queue.CacheCallback;
 import org.apache.sling.distribution.journal.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.queue.QueuedCallback;
 import org.apache.sling.distribution.journal.shared.TestMessageInfo;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.junit.After;
@@ -55,7 +56,6 @@ import org.mockito.Captor;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.runners.MockitoJUnitRunner;
-import org.osgi.service.event.EventAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,7 +78,7 @@ public class PubQueueCacheTest {
     private ArgumentCaptor<MessageHandler<PackageMessage>> handlerCaptor;
 
     @Mock
-    private EventAdmin eventAdmin;
+    private QueuedCallback queuedCallback;
 
     @Mock
     private CacheCallback callback;
@@ -101,7 +101,7 @@ public class PubQueueCacheTest {
         when(callback.createConsumer(handlerCaptor.capture()))
                 .thenReturn(poller);
 
-        cache = new PubQueueCache(eventAdmin, callback);
+        cache = new PubQueueCache(queuedCallback, callback);
         executor = Executors.newFixedThreadPool(10);
         tailHandler = handlerCaptor.getValue();
     }

[sling-org-apache-sling-distribution-journal] 01/02: SLING-10564 - Extend queue notifier to return a long

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch SLING-10564
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit e87e8075928532a2649028549ee1ac1b1c4bade6
Author: tmaret <tm...@adobe.com>
AuthorDate: Thu Jul 8 16:33:18 2021 +0200

    SLING-10564 - Extend queue notifier to return a long
---
 .../impl/publisher/DistributionPublisher.java        | 20 +++++++++++---------
 .../impl/publisher/PackageQueuedNotifier.java        | 11 ++++++-----
 .../impl/publisher/DistributionPublisherTest.java    |  2 +-
 .../impl/publisher/PackageQueuedNotifierTest.java    |  4 ++--
 4 files changed, 20 insertions(+), 17 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index 56a8817..12a75cf 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -35,6 +35,7 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
@@ -91,7 +92,7 @@ public class DistributionPublisher implements DistributionAgent {
 
     public static final String FACTORY_PID = "org.apache.sling.distribution.journal.impl.publisher.DistributionPublisherFactory";
 
-    private final Map<DistributionRequestType, Consumer<PackageMessage>> REQ_TYPES = new HashMap<>();
+    private final Map<DistributionRequestType, Function<PackageMessage, Long>> REQ_TYPES = new HashMap<>();
 
     private final DefaultDistributionLog log;
 
@@ -253,7 +254,7 @@ public class DistributionPublisher implements DistributionAgent {
     public DistributionResponse execute(ResourceResolver resourceResolver,
                                         DistributionRequest request)
             throws DistributionException {
-        Consumer<PackageMessage> handler = REQ_TYPES.get(request.getRequestType());
+        Function<PackageMessage, Long> handler = REQ_TYPES.get(request.getRequestType());
         if (handler != null) {
             return execute(resourceResolver, request, handler);
         } else {
@@ -263,7 +264,7 @@ public class DistributionPublisher implements DistributionAgent {
 
     private DistributionResponse execute(ResourceResolver resourceResolver,
                                          DistributionRequest request,
-                                         Consumer<PackageMessage> sender)
+                                         Function<PackageMessage, Long> sender)
             throws DistributionException {
         final PackageMessage pkg;
         try {
@@ -275,10 +276,10 @@ public class DistributionPublisher implements DistributionAgent {
         }
 
         try {
-            timed(distributionMetricsService.getEnqueuePackageDuration(), () -> sender.accept(pkg));
+            long offset = timed(distributionMetricsService.getEnqueuePackageDuration(), () -> sender.apply(pkg));
             distributionMetricsService.getExportedPackageSize().update(pkg.getPkgLength());
             distributionMetricsService.getAcceptedRequests().mark();
-            String msg = String.format("Request accepted with distribution package %s", pkg);
+            String msg = String.format("Request accepted with distribution package %s at offset=%s", pkg, offset);
             log.info(msg);
             return new SimpleDistributionResponse(ACCEPTED, msg);
         } catch (Throwable e) {
@@ -293,17 +294,18 @@ public class DistributionPublisher implements DistributionAgent {
         }
     }
     
-    private void send(PackageMessage pkg) {
+    private long send(PackageMessage pkg) {
         sender.accept(pkg);
+        return -1;
     }
 
-    private void sendAndWait(PackageMessage pkg) {
+    private long sendAndWait(PackageMessage pkg) {
         try {
-            CompletableFuture<Void> received = queuedNotifier.registerWait(pkg.getPkgId());
+            CompletableFuture<Long> received = queuedNotifier.registerWait(pkg.getPkgId());
             Event createdEvent = DistributionEvent.eventPackageCreated(pkg, pubAgentName);
             eventAdmin.postEvent(createdEvent);
             sender.accept(pkg);
-            received.get(queuedTimeout, TimeUnit.MILLISECONDS);
+            return received.get(queuedTimeout, TimeUnit.MILLISECONDS);
         } catch (Exception e) {
             queuedNotifier.unRegisterWait(pkg.getPkgId());
             throw new RuntimeException(e);
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifier.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifier.java
index b51b252..6862db3 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifier.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifier.java
@@ -44,7 +44,7 @@ public class PackageQueuedNotifier implements EventHandler {
     /**
      * (packageId x Future)
      */
-    private final Map<String, CompletableFuture<Void>> receiveCallbacks;
+    private final Map<String, CompletableFuture<Long>> receiveCallbacks;
     
     public PackageQueuedNotifier() {
         this.receiveCallbacks = new ConcurrentHashMap<>();
@@ -62,19 +62,20 @@ public class PackageQueuedNotifier implements EventHandler {
     @Override
     public void handleEvent(Event event) {
         String packageId = (String) event.getProperty(DistributionEvent.PACKAGE_ID);
+        long offset = -1; // TODO get the offset from event or from the queue cache
         LOG.debug("Handling event for pkgId={}", packageId);
-        CompletableFuture<Void> callback = null;
+        CompletableFuture<Long> callback = null;
         if (packageId != null) {
             callback = receiveCallbacks.remove(packageId);
         }
         if (callback != null) {
-            callback.complete(null);
+            callback.complete(offset);
         }
     }
 
-    public CompletableFuture<Void> registerWait(String packageId) {
+    public CompletableFuture<Long> registerWait(String packageId) {
         LOG.debug("Registering wait condition for pkgId={}", packageId);
-        CompletableFuture<Void> packageReceived = new CompletableFuture<>();
+        CompletableFuture<Long> packageReceived = new CompletableFuture<>();
         receiveCallbacks.put(packageId, packageReceived);
         return packageReceived;
     }
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index 7d3e980..980ff01 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -263,7 +263,7 @@ public class DistributionPublisherTest {
     private void executeAndCheck(DistributionRequest request) throws IOException, DistributionException {
         PackageMessage pkg = mockPackage(request);
         when(factory.create(Matchers.any(DistributionPackageBuilder.class),Mockito.eq(resourceResolver), anyString(), Mockito.eq(request))).thenReturn(pkg);
-        CompletableFuture<Void> callback = CompletableFuture.completedFuture(null);
+        CompletableFuture<Long> callback = CompletableFuture.completedFuture(-1L);
         when(queuedNotifier.registerWait(Mockito.eq(pkg.getPkgId()))).thenReturn(callback);
         when(distributionMetricsService.getExportedPackageSize()).thenReturn(histogram);
         when(distributionMetricsService.getAcceptedRequests()).thenReturn(meter);
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifierTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifierTest.java
index 092402e..7954e3c 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifierTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageQueuedNotifierTest.java
@@ -42,7 +42,7 @@ public class PackageQueuedNotifierTest {
     @Test
     public void test() throws InterruptedException, ExecutionException, TimeoutException {
         notifier = new PackageQueuedNotifier();
-        CompletableFuture<Void> arrived = notifier.registerWait("1");
+        CompletableFuture<Long> arrived = notifier.registerWait("1");
         notifier.handleEvent(DistributionEvent.eventPackageQueued(pkgMsg("2"), PUB_AGENT_ID));
         try {
             arrived.get(100,TimeUnit.MILLISECONDS);
@@ -57,7 +57,7 @@ public class PackageQueuedNotifierTest {
     @Test
     public void testForNullPackage() throws InterruptedException, ExecutionException, TimeoutException {
         notifier = new PackageQueuedNotifier();
-        CompletableFuture<Void> arrived = notifier.registerWait("1");
+        CompletableFuture<Long> arrived = notifier.registerWait("1");
         notifier.handleEvent(new Event(AGENT_PACKAGE_QUEUED, new HashMap<>()));
         try {
             arrived.get(100,TimeUnit.MILLISECONDS);