You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2020/07/23 08:49:56 UTC

[sling-org-apache-sling-distribution-journal] branch master updated: SLING-9577 - Switch to seeding thread (#52)

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new 766733f  SLING-9577 - Switch to seeding thread (#52)
766733f is described below

commit 766733fe1da9eb79ae33ba25fa4355301f49f49b
Author: Christian Schneider <ch...@die-schneider.net>
AuthorDate: Thu Jul 23 10:49:45 2020 +0200

    SLING-9577 - Switch to seeding thread (#52)
    
    * SLING-9577 - Switch to seeding thread
    
    * SLING-9577 - Remove unnecessar parameters
    
    * SLING-9577 - Remove unnecessary logging
    
    * SLING-9577 - Add count so we can monitor numnber of seeding messages
    
    * SLING-9577 - Start counting with 1
    
    * SLING-9577 - Limit maximum number of messages sent on seeder thread
    
    * SLING-9577 - Use exponential backoff and unlimited messages on seeder thread
---
 .../journal/impl/queue/impl/PubQueueCache.java     | 59 ++------------
 .../impl/queue/impl/PubQueueCacheService.java      | 35 ++-------
 .../journal/impl/queue/impl/QueueCacheSeeder.java  | 90 +++++++++++-----------
 .../impl/queue/impl/QueueCacheSeederTask.java      | 61 ---------------
 .../journal/impl/queue/impl/PubQueueCacheTest.java | 54 +++++++------
 .../impl/queue/impl/PubQueueProviderTest.java      | 17 +---
 .../impl/queue/impl/QueueCacheSeederTest.java      | 63 ++-------------
 7 files changed, 96 insertions(+), 283 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index 940313b..fdd6ff7 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -37,13 +37,11 @@ import java.util.concurrent.locks.ReentrantLock;
 import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 
-import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
 import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
 import org.apache.sling.distribution.journal.shared.JMXRegistration;
-import org.apache.sling.distribution.journal.shared.LocalStore;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
@@ -102,12 +100,6 @@ public class PubQueueCache {
      */
     private final AtomicLong maxOffset = new AtomicLong(-1L);
 
-    /**
-     * Holds the last known seed offset stored to the
-     * seed store.
-     */
-    private volatile long seedOffset = -1L;
-
     private final Set<JMXRegistration> jmxRegs = new HashSet<>();
 
     private final MessagingProvider messagingProvider;
@@ -120,44 +112,23 @@ public class PubQueueCache {
 
     private final String topic;
 
-    private final LocalStore seedStore;
-
     private final DistributionMetricsService distributionMetricsService;
     
-    public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic, LocalStore seedStore, QueueCacheSeeder seeder) {
+    public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic, QueueCacheSeeder seeder) {
         this.messagingProvider = messagingProvider;
         this.eventAdmin = eventAdmin;
         this.distributionMetricsService = distributionMetricsService;
         this.topic = topic;
-        this.seedStore = seedStore;
         this.seeder = seeder;
-        Long offset = seedStore.load("offset", Long.class);
-        if (offset != null) {
-            seedOffset = offset;
-            startPoller(seedOffset);
-            /*
-             * We need at least one seeding message
-             * for cases where the seedOffset is no
-             * longer on the journal.
-             */
-            seeder.seedOne();
-        } else {
-            /*
-             * Fallback to seeding messages when
-             * no offset could be found in the
-             * repository.
-             */
-            seeder.seed(this::startPoller);
-        }
+        startPoller();
+        this.seeder.startSeeding();
     }
 
-    private void startPoller(long offset) {
-        LOG.info("Seed with offset: {}", offset);
-        String assignTo = messagingProvider.assignTo(offset);
+    private void startPoller() {
+        LOG.info("Starting consumer");
         tailPoller = messagingProvider.createPoller(
                 this.topic,
-                Reset.earliest,
-                assignTo,
+                Reset.latest,
                 create(PackageMessage.class, this::handlePackage) 
                 );
     }
@@ -171,23 +142,6 @@ public class PubQueueCache {
         return agentQueues.getOrDefault(pubAgentName, new OffsetQueueImpl<>());
     }
 
-    public void storeSeed() {
-        long newSeed = maxOffset.longValue();
-        if (newSeed > seedOffset) {
-            storeSeed(newSeed);
-            seedOffset = newSeed;
-        }
-    }
-
-    private void storeSeed(long offset) {
-        LOG.info("Store seed offset {}", offset);
-        try {
-            seedStore.store("offset", offset);
-        } catch (PersistenceException e) {
-            LOG.warn("Failed to persist seed offset", e);
-        }
-    }
-
     public int size() {
         return agentQueues.values().stream().mapToInt(OffsetQueue::getSize).sum();
     }
@@ -320,6 +274,7 @@ public class PubQueueCache {
     }
 
     private void handlePackage(final MessageInfo info, final PackageMessage message) {
+        seeder.close();
         merge(singletonList(new FullMessage<>(info, message)));
         updateMaxOffset(info.getOffset());
     }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
index f9291ed..b8d6692 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
@@ -21,16 +21,15 @@ package org.apache.sling.distribution.journal.impl.queue.impl;
 import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 
-import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.LocalStore;
 import org.apache.sling.distribution.journal.shared.PublisherConfigurationAvailable;
 import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.JournalAvailable;
+import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.apache.sling.settings.SlingSettingsService;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
@@ -75,35 +74,20 @@ public class PubQueueCacheService {
     @Reference
     private DistributionMetricsService distributionMetricsService;
 
-    @Reference
-    private SlingSettingsService slingSettings;
-
-    @Reference
-    private ResourceResolverFactory resolverFactory;
-
     private volatile PubQueueCache cache;
 
-    private String pubSlingId;
-
     public PubQueueCacheService() {}
 
     public PubQueueCacheService(MessagingProvider messagingProvider,
                                 Topics topics,
-                                EventAdmin eventAdmin,
-                                SlingSettingsService slingSettingsService,
-                                ResourceResolverFactory resolverFactory,
-                                String pubSlingId) {
+                                EventAdmin eventAdmin) {
         this.messagingProvider = messagingProvider;
         this.topics = topics;
         this.eventAdmin = eventAdmin;
-        this.slingSettings = slingSettingsService;
-        this.resolverFactory = resolverFactory;
-        this.pubSlingId = pubSlingId;
     }
 
     @Activate
     public void activate() {
-        pubSlingId = slingSettings.getSlingId();
         cache = newCache();
         LOG.info("Started Publisher queue cache service");
     }
@@ -145,17 +129,10 @@ public class PubQueueCacheService {
         }
     }
 
-    public void storeSeed() {
-        PubQueueCache queueCache = this.cache;
-        if (queueCache != null) {
-            queueCache.storeSeed();
-        }
-    }
-
     private PubQueueCache newCache() {
-        LocalStore seedStore = new LocalStore(resolverFactory, "seeds", pubSlingId);
         String topic = topics.getPackageTopic();
-        QueueCacheSeeder seeder = new QueueCacheSeeder(messagingProvider, topic);
-        return new PubQueueCache(messagingProvider, eventAdmin, distributionMetricsService, topic, seedStore, seeder);
+        MessageSender<PackageMessage> sender = messagingProvider.createSender(topic);
+        QueueCacheSeeder seeder = new QueueCacheSeeder(sender);
+        return new PubQueueCache(messagingProvider, eventAdmin, distributionMetricsService, topic, seeder);
     }
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
index e3ac96c..26a6eaa 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
@@ -18,25 +18,18 @@
  */
 package org.apache.sling.distribution.journal.impl.queue.impl;
 
+import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
+
 import java.io.Closeable;
 import java.util.UUID;
-import java.util.function.LongConsumer;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingException;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.sling.distribution.journal.HandlerAdapter.create;
-import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
-
 public class QueueCacheSeeder implements Closeable {
-
-
     private static final Logger LOG = LoggerFactory.getLogger(QueueCacheSeeder.class);
 
     /**
@@ -44,45 +37,48 @@ public class QueueCacheSeeder implements Closeable {
      */
     private static final long CACHE_SEEDING_DELAY_MS = 10_000;
 
-    private final String topic;
-
-    private final MessagingProvider provider;
-
-    private volatile Closeable poller;
+    private static final int MAX_CACHE_SEEDING_DELAY_MS = 900_000; // 15 minutes
 
     private volatile boolean closed;
 
-    public QueueCacheSeeder(MessagingProvider provider, String topic) {
-        this.provider = provider;
-        this.topic = topic;
-    }
+    private MessageSender<PackageMessage> sender;
 
-    public void seedOne() {
-        startBackgroundThread(this::sendSeedingMessage, "Seeder thread - one seed");
+    private Thread seedingThread;
+    
+    public QueueCacheSeeder(MessageSender<PackageMessage> sender) {
+        this.sender = sender;
     }
 
-    public void seed(LongConsumer callback) {
-        poller = provider.createPoller(topic, Reset.latest,
-                create(PackageMessage.class, (info, msg) -> {
-                    close();
-                    callback.accept(info.getOffset());
-                }));
-        startBackgroundThread(this::sendSeedingMessages, "Seeder thread");
+    public void startSeeding() {
+        seedingThread = startBackgroundThread(this::sendSeedingMessages, "Seeder thread");
     }
 
     @Override
     public void close() {
-        closed = true;
-        IOUtils.closeQuietly(poller);
+        if (!closed) {
+            closed = true;
+            try {
+                seedingThread.join();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
     }
 
+    /**
+     * We repeatedly send seeding messages as the first message is sometimes not received by the consumer.
+     */
     private void sendSeedingMessages() {
         LOG.info("Start message seeder");
+        int count = 1;
+        long cacheSeedingDelay = CACHE_SEEDING_DELAY_MS;
         try {
-            MessageSender<PackageMessage> sender = provider.createSender(topic);
             while (!closed) {
-                sendSeedingMessage(sender);
-                delay(CACHE_SEEDING_DELAY_MS);
+                LOG.info("Send seeding message {} then wait {} ms", count, cacheSeedingDelay);
+                sendSeedingMessage();
+                delay(cacheSeedingDelay);
+                cacheSeedingDelay = Math.min(cacheSeedingDelay * 2, MAX_CACHE_SEEDING_DELAY_MS);
+                count++;
             }
         } finally {
             LOG.info("Stop message seeder");
@@ -90,25 +86,31 @@ public class QueueCacheSeeder implements Closeable {
     }
 
     private void sendSeedingMessage() {
-        sendSeedingMessage(provider.createSender(topic));
-    }
-
-    private void sendSeedingMessage(MessageSender<PackageMessage> sender) {
-        PackageMessage pkgMsg = createTestMessage();
-        LOG.info("Send seeding message");
         try {
+            PackageMessage pkgMsg = createTestMessage();
             sender.send(pkgMsg);
         } catch (MessagingException e) {
             LOG.warn(e.getMessage(), e);
-            delay(CACHE_SEEDING_DELAY_MS * 10);
         }
     }
 
-    private static void delay(long sleepMs) {
-        try {
-            Thread.sleep(sleepMs);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
+    /**
+     * Sleep with handling of interrupts and quick exit in case of closed.
+     * We do not interrupt the seeder thread from outside as this sometimes fails in the messaging impl code.
+     * 
+     * @param sleepMs milliseconds to sleep
+     */
+    private void delay(long sleepMs) {
+        long sleepCycles = sleepMs / 100;
+        for (int curCycle=0; curCycle < sleepCycles; curCycle++) {
+            if (closed) {
+                return;
+            }
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
         }
     }
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTask.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTask.java
deleted file mode 100644
index b18ee55..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTask.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.impl.queue.impl;
-
-import javax.annotation.ParametersAreNonnullByDefault;
-
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
-import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
-import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_RUN_ON;
-import static org.apache.sling.commons.scheduler.Scheduler.VALUE_RUN_ON_LEADER;
-
-/**
- * Periodical task to persist a cache seed
- * to the repository. The task must run only
- * on the leader instance to avoid concurrent
- * writes and reduce write operations in
- * clustered deployments.
- */
-@Component(
-        service = Runnable.class,
-        property = {
-                PROPERTY_SCHEDULER_CONCURRENT + ":Boolean=false",
-                PROPERTY_SCHEDULER_RUN_ON + "=" +  VALUE_RUN_ON_LEADER,
-                PROPERTY_SCHEDULER_PERIOD + ":Long=" + 15 * 60 // 15 minutes
-        })
-@ParametersAreNonnullByDefault
-public class QueueCacheSeederTask implements Runnable {
-
-    private static final Logger LOG = LoggerFactory.getLogger(QueueCacheSeederTask.class);
-
-    @Reference
-    private PubQueueCacheService queueCacheService;
-
-    @Override
-    public void run() {
-        LOG.debug("Starting package cache seeder task");
-        queueCacheService.storeSeed();
-        LOG.debug("Stopping package cache seeder task");
-    }
-}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
index 0c9b634..aca7d01 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
@@ -25,9 +25,9 @@ import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.Closeable;
@@ -42,6 +42,7 @@ import java.util.stream.LongStream;
 import org.apache.sling.commons.metrics.Counter;
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
+import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
@@ -59,6 +60,7 @@ import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.runners.MockitoJUnitRunner;
 import org.osgi.service.event.EventAdmin;
 import org.slf4j.Logger;
@@ -82,9 +84,6 @@ public class PubQueueCacheTest {
     @Captor
     private ArgumentCaptor<HandlerAdapter<PackageMessage>> handlerCaptor;
 
-    @Captor
-    private ArgumentCaptor<String> headAssignCaptor;
-
     @Mock
     private EventAdmin eventAdmin;
 
@@ -106,31 +105,42 @@ public class PubQueueCacheTest {
     @Mock
     private Closeable poller;
 
+    @Mock
+    private MessageSender<Object> sender;
+
+    @Mock
+    private QueueCacheSeeder seeder;
+
     private PubQueueCache cache;
 
     private ExecutorService executor;
 
     private MessageHandler<PackageMessage> tailHandler;
 
+
     @Before
     public void before() {
-        when(clientProvider.assignTo(anyLong())).then(
-                answer -> "0:" + answer.getArguments()[0]);
         when(clientProvider.createPoller(
                 eq(TOPIC),
-                any(Reset.class),
-                headAssignCaptor.capture(),
+                eq(Reset.latest),
+                handlerCaptor.capture()))
+                .thenReturn(poller);
+        when(clientProvider.createPoller(
+                eq(TOPIC),
+                eq(Reset.earliest),
+                Mockito.anyString(),
                 handlerCaptor.capture()))
                 .thenReturn(poller);
+        when(clientProvider.createSender(Mockito.anyString()))
+            .thenReturn(sender);
 
         when(distributionMetricsService.getQueueCacheFetchCount())
-                .thenReturn(counter);
+            .thenReturn(counter);
 
         when(seedStore.load(anyString(), any())).thenReturn(0L);
 
-        cache = new PubQueueCache(clientProvider, eventAdmin, distributionMetricsService, TOPIC, seedStore, cacheSeeder);
-        cache.storeSeed();
-
+        cache = new PubQueueCache(clientProvider, eventAdmin, distributionMetricsService, TOPIC, seeder);
+        verify(seeder).startSeeding();
         executor = Executors.newFixedThreadPool(10);
         tailHandler = handlerCaptor.getValue().getHandler();
     }
@@ -159,17 +169,17 @@ public class PubQueueCacheTest {
         Future<OffsetQueue<DistributionQueueItem>> consumer = consumer(PUB_AGENT_NAME_1, 100);
         // seeding the cache with a message at offset 200
         // wait that the consumer has started fetching the offsets from 100 to 200
-        awaitHeadHandler();
+        MessageHandler<PackageMessage> headHandler = awaitHeadHandler();
         // simulate messages for the fetched offsets
-        long fromOffset = offsetFromAssign(headAssignCaptor.getValue());
-        simulateMessages(handlerCaptor.getValue().getHandler(), fromOffset, cache.getMinOffset());
+        simulateMessages(headHandler, 100, cache.getMinOffset());
         // the consumer returns the offset queue
         consumer.get(15, SECONDS);
         assertEquals(100, cache.getMinOffset());
     }
 
     private MessageHandler<PackageMessage> awaitHeadHandler() {
-        return Awaitility.await().ignoreExceptions().until(() -> handlerCaptor.getAllValues().get(1).getHandler(), notNullValue());
+        return Awaitility.await().ignoreExceptions()
+                .until(() -> handlerCaptor.getAllValues().get(1).getHandler(), notNullValue());
     }
 
 	@Test
@@ -182,8 +192,7 @@ public class PubQueueCacheTest {
         // wait that one consumer has started fetching the offsets from 100 to 200
         MessageHandler<PackageMessage> headHandler = awaitHeadHandler();
         // simulate messages for the fetched offsets
-        long fromOffset = offsetFromAssign(headAssignCaptor.getValue());
-        simulateMessages(headHandler, fromOffset, cache.getMinOffset());
+        simulateMessages(headHandler, 100, cache.getMinOffset());
         // both consumers returns the offset queue
         OffsetQueue<DistributionQueueItem> q1 = consumer1.get(5, SECONDS);
         OffsetQueue<DistributionQueueItem> q2 = consumer2.get(5, SECONDS);
@@ -205,7 +214,8 @@ public class PubQueueCacheTest {
     }
 
     private void simulateMessages(MessageHandler<PackageMessage> handler, long fromOffset, long toOffset) {
-        LongStream.rangeClosed(fromOffset, toOffset).forEach(offset -> simulateMessage(handler, offset));
+        LongStream.rangeClosed(fromOffset, toOffset)
+            .forEach(offset -> simulateMessage(handler, offset));
     }
     
     private void simulateMessage(MessageHandler<PackageMessage> handler, long offset) {
@@ -242,10 +252,4 @@ public class PubQueueCacheTest {
         return c[RAND.nextInt(c.length)];
     }
 
-    private Long offsetFromAssign(String assign) {
-        String[] chunks = assign.split(":");
-        return Long.parseLong(chunks[1]);
-    }
-
-
 }
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
index 846386c..da13e6b 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
@@ -29,7 +29,6 @@ import java.lang.management.ManagementFactory;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Set;
-import java.util.UUID;
 
 import javax.management.AttributeNotFoundException;
 import javax.management.InstanceNotFoundException;
@@ -40,7 +39,6 @@ import javax.management.ObjectName;
 import javax.management.ReflectionException;
 
 import org.apache.sling.api.resource.PersistenceException;
-import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
 import org.apache.sling.distribution.journal.MessageInfo;
@@ -51,13 +49,10 @@ import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
-import org.apache.sling.distribution.journal.shared.LocalStore;
 import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
-import org.apache.sling.settings.SlingSettingsService;
-import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -80,9 +75,6 @@ public class PubQueueProviderTest {
     @Mock
     private MessagingProvider clientProvider;
     
-    @Mock
-    private SlingSettingsService slingSettings;
-    
     @Captor
     private ArgumentCaptor<HandlerAdapter<PackageMessage>> handlerCaptor;
 
@@ -101,8 +93,6 @@ public class PubQueueProviderTest {
     @Mock
     private MessageSender<Object> sender;
 
-    private ResourceResolverFactory resolverFactory = new MockResourceResolverFactory();
-
     private PubQueueCacheService pubQueueCacheService;
 
     private MessageHandler<PackageMessage> handler;
@@ -117,7 +107,6 @@ public class PubQueueProviderTest {
         when(clientProvider.createPoller(
                 Mockito.eq(Topics.PACKAGE_TOPIC),
                 Mockito.any(Reset.class),
-                Mockito.anyString(),
                 handlerCaptor.capture()))
         .thenReturn(poller);
         when(clientProvider.createPoller(
@@ -128,11 +117,7 @@ public class PubQueueProviderTest {
         when(clientProvider.createSender(Mockito.anyString()))
         .thenReturn(sender);
         Topics topics = new Topics();
-        String slingId = UUID.randomUUID().toString();
-        when(slingSettings.getSlingId()).thenReturn(slingId);
-        LocalStore seedStore = new LocalStore(resolverFactory, "seeds", slingId);
-        seedStore.store("offset", 1L);
-        pubQueueCacheService = new PubQueueCacheService(clientProvider, topics, eventAdmin, slingSettings, resolverFactory, slingId);
+        pubQueueCacheService = new PubQueueCacheService(clientProvider, topics, eventAdmin);
         pubQueueCacheService.activate();
         queueProvider = new PubQueueProviderImpl(pubQueueCacheService, clientProvider, topics);
         queueProvider.activate();
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
index 2bddbe7..29a5133 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
@@ -18,28 +18,14 @@
  */
 package org.apache.sling.distribution.journal.impl.queue.impl;
 
-import static java.lang.System.currentTimeMillis;
-import static org.apache.sling.distribution.journal.shared.Topics.PACKAGE_TOPIC;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
-import java.io.Closeable;
 import java.io.IOException;
-import java.util.function.LongConsumer;
 
-import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageSender;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
-import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
-import org.apache.sling.distribution.journal.shared.TestMessageInfo;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -47,62 +33,33 @@ import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.mockito.runners.MockitoJUnitRunner;
 
 @RunWith(MockitoJUnitRunner.class)
 public class QueueCacheSeederTest {
 
-    @Mock
-    private MessagingProvider clientProvider;
-
-    @Captor
-    private ArgumentCaptor<HandlerAdapter<PackageMessage>> pkgHandlerCaptor;
-
     @Captor
     private ArgumentCaptor<PackageMessage> pkgMsgCaptor;
 
     @Mock
-    private Closeable poller;
-
-    @Mock
     private MessageSender<PackageMessage> sender;
 
-    @Mock
-    private LongConsumer callback;
-
     private QueueCacheSeeder seeder;
 
     @Before
     public void before() {
         MockitoAnnotations.initMocks(this);
-        when(clientProvider.createPoller(
-                eq(PACKAGE_TOPIC),
-                any(Reset.class),
-                pkgHandlerCaptor.capture()))
-                .thenReturn(poller);
         doNothing().when(sender).send(pkgMsgCaptor.capture());
-        when(clientProvider.<PackageMessage>createSender(eq(PACKAGE_TOPIC)))
-                .thenReturn(sender);
-        seeder = new QueueCacheSeeder(clientProvider, PACKAGE_TOPIC);
-    }
-
-    @Test
-    public void testSeededCallback() throws IOException {
-        seeder.seed(callback);
-        long offset = 15L;
-        simulateSeedingMsg(offset);
-        verify(callback).accept(offset);
-        verify(poller).close();
+        seeder = new QueueCacheSeeder(sender);
     }
 
     @Test
-    public void testSendingSeeds() {
-        seeder.seed(callback);
-        verify(sender, timeout(5000).atLeastOnce()).send(pkgMsgCaptor.capture());
-        PackageMessage seedMsg = pkgMsgCaptor.getValue();
-        assertNotNull(seedMsg);
-        assertEquals(ReqType.TEST, seedMsg.getReqType());
+    public void testSeeding() throws IOException {
+        seeder.startSeeding();
+        
+        verify(sender, timeout(1000)).send(Mockito.anyObject());
     }
 
     @After
@@ -110,10 +67,4 @@ public class QueueCacheSeederTest {
         seeder.close();
     }
 
-    private void simulateSeedingMsg(long offset) {
-        PackageMessage msg = seeder.createTestMessage();
-        pkgHandlerCaptor.getValue().getHandler().handle(
-                new TestMessageInfo(PACKAGE_TOPIC, 0, offset, currentTimeMillis()),
-                msg);
-    }
-}
\ No newline at end of file
+}