You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2020/07/23 08:49:56 UTC
[sling-org-apache-sling-distribution-journal] branch master
updated: SLING-9577 - Switch to seeding thread (#52)
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new 766733f SLING-9577 - Switch to seeding thread (#52)
766733f is described below
commit 766733fe1da9eb79ae33ba25fa4355301f49f49b
Author: Christian Schneider <ch...@die-schneider.net>
AuthorDate: Thu Jul 23 10:49:45 2020 +0200
SLING-9577 - Switch to seeding thread (#52)
* SLING-9577 - Switch to seeding thread
* SLING-9577 - Remove unnecessar parameters
* SLING-9577 - Remove unnecessary logging
* SLING-9577 - Add count so we can monitor numnber of seeding messages
* SLING-9577 - Start counting with 1
* SLING-9577 - Limit maximum number of messages sent on seeder thread
* SLING-9577 - Use exponential backoff and unlimited messages on seeder thread
---
.../journal/impl/queue/impl/PubQueueCache.java | 59 ++------------
.../impl/queue/impl/PubQueueCacheService.java | 35 ++-------
.../journal/impl/queue/impl/QueueCacheSeeder.java | 90 +++++++++++-----------
.../impl/queue/impl/QueueCacheSeederTask.java | 61 ---------------
.../journal/impl/queue/impl/PubQueueCacheTest.java | 54 +++++++------
.../impl/queue/impl/PubQueueProviderTest.java | 17 +---
.../impl/queue/impl/QueueCacheSeederTest.java | 63 ++-------------
7 files changed, 96 insertions(+), 283 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index 940313b..fdd6ff7 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -37,13 +37,11 @@ import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
-import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.shared.JMXRegistration;
-import org.apache.sling.distribution.journal.shared.LocalStore;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
@@ -102,12 +100,6 @@ public class PubQueueCache {
*/
private final AtomicLong maxOffset = new AtomicLong(-1L);
- /**
- * Holds the last known seed offset stored to the
- * seed store.
- */
- private volatile long seedOffset = -1L;
-
private final Set<JMXRegistration> jmxRegs = new HashSet<>();
private final MessagingProvider messagingProvider;
@@ -120,44 +112,23 @@ public class PubQueueCache {
private final String topic;
- private final LocalStore seedStore;
-
private final DistributionMetricsService distributionMetricsService;
- public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic, LocalStore seedStore, QueueCacheSeeder seeder) {
+ public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic, QueueCacheSeeder seeder) {
this.messagingProvider = messagingProvider;
this.eventAdmin = eventAdmin;
this.distributionMetricsService = distributionMetricsService;
this.topic = topic;
- this.seedStore = seedStore;
this.seeder = seeder;
- Long offset = seedStore.load("offset", Long.class);
- if (offset != null) {
- seedOffset = offset;
- startPoller(seedOffset);
- /*
- * We need at least one seeding message
- * for cases where the seedOffset is no
- * longer on the journal.
- */
- seeder.seedOne();
- } else {
- /*
- * Fallback to seeding messages when
- * no offset could be found in the
- * repository.
- */
- seeder.seed(this::startPoller);
- }
+ startPoller();
+ this.seeder.startSeeding();
}
- private void startPoller(long offset) {
- LOG.info("Seed with offset: {}", offset);
- String assignTo = messagingProvider.assignTo(offset);
+ private void startPoller() {
+ LOG.info("Starting consumer");
tailPoller = messagingProvider.createPoller(
this.topic,
- Reset.earliest,
- assignTo,
+ Reset.latest,
create(PackageMessage.class, this::handlePackage)
);
}
@@ -171,23 +142,6 @@ public class PubQueueCache {
return agentQueues.getOrDefault(pubAgentName, new OffsetQueueImpl<>());
}
- public void storeSeed() {
- long newSeed = maxOffset.longValue();
- if (newSeed > seedOffset) {
- storeSeed(newSeed);
- seedOffset = newSeed;
- }
- }
-
- private void storeSeed(long offset) {
- LOG.info("Store seed offset {}", offset);
- try {
- seedStore.store("offset", offset);
- } catch (PersistenceException e) {
- LOG.warn("Failed to persist seed offset", e);
- }
- }
-
public int size() {
return agentQueues.values().stream().mapToInt(OffsetQueue::getSize).sum();
}
@@ -320,6 +274,7 @@ public class PubQueueCache {
}
private void handlePackage(final MessageInfo info, final PackageMessage message) {
+ seeder.close();
merge(singletonList(new FullMessage<>(info, message)));
updateMaxOffset(info.getOffset());
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
index f9291ed..b8d6692 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
@@ -21,16 +21,15 @@ package org.apache.sling.distribution.journal.impl.queue.impl;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
-import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.LocalStore;
import org.apache.sling.distribution.journal.shared.PublisherConfigurationAvailable;
import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.JournalAvailable;
+import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.apache.sling.settings.SlingSettingsService;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
@@ -75,35 +74,20 @@ public class PubQueueCacheService {
@Reference
private DistributionMetricsService distributionMetricsService;
- @Reference
- private SlingSettingsService slingSettings;
-
- @Reference
- private ResourceResolverFactory resolverFactory;
-
private volatile PubQueueCache cache;
- private String pubSlingId;
-
public PubQueueCacheService() {}
public PubQueueCacheService(MessagingProvider messagingProvider,
Topics topics,
- EventAdmin eventAdmin,
- SlingSettingsService slingSettingsService,
- ResourceResolverFactory resolverFactory,
- String pubSlingId) {
+ EventAdmin eventAdmin) {
this.messagingProvider = messagingProvider;
this.topics = topics;
this.eventAdmin = eventAdmin;
- this.slingSettings = slingSettingsService;
- this.resolverFactory = resolverFactory;
- this.pubSlingId = pubSlingId;
}
@Activate
public void activate() {
- pubSlingId = slingSettings.getSlingId();
cache = newCache();
LOG.info("Started Publisher queue cache service");
}
@@ -145,17 +129,10 @@ public class PubQueueCacheService {
}
}
- public void storeSeed() {
- PubQueueCache queueCache = this.cache;
- if (queueCache != null) {
- queueCache.storeSeed();
- }
- }
-
private PubQueueCache newCache() {
- LocalStore seedStore = new LocalStore(resolverFactory, "seeds", pubSlingId);
String topic = topics.getPackageTopic();
- QueueCacheSeeder seeder = new QueueCacheSeeder(messagingProvider, topic);
- return new PubQueueCache(messagingProvider, eventAdmin, distributionMetricsService, topic, seedStore, seeder);
+ MessageSender<PackageMessage> sender = messagingProvider.createSender(topic);
+ QueueCacheSeeder seeder = new QueueCacheSeeder(sender);
+ return new PubQueueCache(messagingProvider, eventAdmin, distributionMetricsService, topic, seeder);
}
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
index e3ac96c..26a6eaa 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
@@ -18,25 +18,18 @@
*/
package org.apache.sling.distribution.journal.impl.queue.impl;
+import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
+
import java.io.Closeable;
import java.util.UUID;
-import java.util.function.LongConsumer;
-import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingException;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.sling.distribution.journal.HandlerAdapter.create;
-import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
-
public class QueueCacheSeeder implements Closeable {
-
-
private static final Logger LOG = LoggerFactory.getLogger(QueueCacheSeeder.class);
/**
@@ -44,45 +37,48 @@ public class QueueCacheSeeder implements Closeable {
*/
private static final long CACHE_SEEDING_DELAY_MS = 10_000;
- private final String topic;
-
- private final MessagingProvider provider;
-
- private volatile Closeable poller;
+ private static final int MAX_CACHE_SEEDING_DELAY_MS = 900_000; // 15 minutes
private volatile boolean closed;
- public QueueCacheSeeder(MessagingProvider provider, String topic) {
- this.provider = provider;
- this.topic = topic;
- }
+ private MessageSender<PackageMessage> sender;
- public void seedOne() {
- startBackgroundThread(this::sendSeedingMessage, "Seeder thread - one seed");
+ private Thread seedingThread;
+
+ public QueueCacheSeeder(MessageSender<PackageMessage> sender) {
+ this.sender = sender;
}
- public void seed(LongConsumer callback) {
- poller = provider.createPoller(topic, Reset.latest,
- create(PackageMessage.class, (info, msg) -> {
- close();
- callback.accept(info.getOffset());
- }));
- startBackgroundThread(this::sendSeedingMessages, "Seeder thread");
+ public void startSeeding() {
+ seedingThread = startBackgroundThread(this::sendSeedingMessages, "Seeder thread");
}
@Override
public void close() {
- closed = true;
- IOUtils.closeQuietly(poller);
+ if (!closed) {
+ closed = true;
+ try {
+ seedingThread.join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
}
+ /**
+ * We repeatedly send seeding messages as the first message is sometimes not received by the consumer.
+ */
private void sendSeedingMessages() {
LOG.info("Start message seeder");
+ int count = 1;
+ long cacheSeedingDelay = CACHE_SEEDING_DELAY_MS;
try {
- MessageSender<PackageMessage> sender = provider.createSender(topic);
while (!closed) {
- sendSeedingMessage(sender);
- delay(CACHE_SEEDING_DELAY_MS);
+ LOG.info("Send seeding message {} then wait {} ms", count, cacheSeedingDelay);
+ sendSeedingMessage();
+ delay(cacheSeedingDelay);
+ cacheSeedingDelay = Math.min(cacheSeedingDelay * 2, MAX_CACHE_SEEDING_DELAY_MS);
+ count++;
}
} finally {
LOG.info("Stop message seeder");
@@ -90,25 +86,31 @@ public class QueueCacheSeeder implements Closeable {
}
private void sendSeedingMessage() {
- sendSeedingMessage(provider.createSender(topic));
- }
-
- private void sendSeedingMessage(MessageSender<PackageMessage> sender) {
- PackageMessage pkgMsg = createTestMessage();
- LOG.info("Send seeding message");
try {
+ PackageMessage pkgMsg = createTestMessage();
sender.send(pkgMsg);
} catch (MessagingException e) {
LOG.warn(e.getMessage(), e);
- delay(CACHE_SEEDING_DELAY_MS * 10);
}
}
- private static void delay(long sleepMs) {
- try {
- Thread.sleep(sleepMs);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ /**
+ * Sleep with handling of interrupts and quick exit in case of closed.
+ * We do not interrupt the seeder thread from outside as this sometimes fails in the messaging impl code.
+ *
+ * @param sleepMs milliseconds to sleep
+ */
+ private void delay(long sleepMs) {
+ long sleepCycles = sleepMs / 100;
+ for (int curCycle=0; curCycle < sleepCycles; curCycle++) {
+ if (closed) {
+ return;
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
}
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTask.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTask.java
deleted file mode 100644
index b18ee55..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTask.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.impl.queue.impl;
-
-import javax.annotation.ParametersAreNonnullByDefault;
-
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
-import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
-import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_RUN_ON;
-import static org.apache.sling.commons.scheduler.Scheduler.VALUE_RUN_ON_LEADER;
-
-/**
- * Periodical task to persist a cache seed
- * to the repository. The task must run only
- * on the leader instance to avoid concurrent
- * writes and reduce write operations in
- * clustered deployments.
- */
-@Component(
- service = Runnable.class,
- property = {
- PROPERTY_SCHEDULER_CONCURRENT + ":Boolean=false",
- PROPERTY_SCHEDULER_RUN_ON + "=" + VALUE_RUN_ON_LEADER,
- PROPERTY_SCHEDULER_PERIOD + ":Long=" + 15 * 60 // 15 minutes
- })
-@ParametersAreNonnullByDefault
-public class QueueCacheSeederTask implements Runnable {
-
- private static final Logger LOG = LoggerFactory.getLogger(QueueCacheSeederTask.class);
-
- @Reference
- private PubQueueCacheService queueCacheService;
-
- @Override
- public void run() {
- LOG.debug("Starting package cache seeder task");
- queueCacheService.storeSeed();
- LOG.debug("Stopping package cache seeder task");
- }
-}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
index 0c9b634..aca7d01 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
@@ -25,9 +25,9 @@ import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.Closeable;
@@ -42,6 +42,7 @@ import java.util.stream.LongStream;
import org.apache.sling.commons.metrics.Counter;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
+import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
@@ -59,6 +60,7 @@ import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
@@ -82,9 +84,6 @@ public class PubQueueCacheTest {
@Captor
private ArgumentCaptor<HandlerAdapter<PackageMessage>> handlerCaptor;
- @Captor
- private ArgumentCaptor<String> headAssignCaptor;
-
@Mock
private EventAdmin eventAdmin;
@@ -106,31 +105,42 @@ public class PubQueueCacheTest {
@Mock
private Closeable poller;
+ @Mock
+ private MessageSender<Object> sender;
+
+ @Mock
+ private QueueCacheSeeder seeder;
+
private PubQueueCache cache;
private ExecutorService executor;
private MessageHandler<PackageMessage> tailHandler;
+
@Before
public void before() {
- when(clientProvider.assignTo(anyLong())).then(
- answer -> "0:" + answer.getArguments()[0]);
when(clientProvider.createPoller(
eq(TOPIC),
- any(Reset.class),
- headAssignCaptor.capture(),
+ eq(Reset.latest),
+ handlerCaptor.capture()))
+ .thenReturn(poller);
+ when(clientProvider.createPoller(
+ eq(TOPIC),
+ eq(Reset.earliest),
+ Mockito.anyString(),
handlerCaptor.capture()))
.thenReturn(poller);
+ when(clientProvider.createSender(Mockito.anyString()))
+ .thenReturn(sender);
when(distributionMetricsService.getQueueCacheFetchCount())
- .thenReturn(counter);
+ .thenReturn(counter);
when(seedStore.load(anyString(), any())).thenReturn(0L);
- cache = new PubQueueCache(clientProvider, eventAdmin, distributionMetricsService, TOPIC, seedStore, cacheSeeder);
- cache.storeSeed();
-
+ cache = new PubQueueCache(clientProvider, eventAdmin, distributionMetricsService, TOPIC, seeder);
+ verify(seeder).startSeeding();
executor = Executors.newFixedThreadPool(10);
tailHandler = handlerCaptor.getValue().getHandler();
}
@@ -159,17 +169,17 @@ public class PubQueueCacheTest {
Future<OffsetQueue<DistributionQueueItem>> consumer = consumer(PUB_AGENT_NAME_1, 100);
// seeding the cache with a message at offset 200
// wait that the consumer has started fetching the offsets from 100 to 200
- awaitHeadHandler();
+ MessageHandler<PackageMessage> headHandler = awaitHeadHandler();
// simulate messages for the fetched offsets
- long fromOffset = offsetFromAssign(headAssignCaptor.getValue());
- simulateMessages(handlerCaptor.getValue().getHandler(), fromOffset, cache.getMinOffset());
+ simulateMessages(headHandler, 100, cache.getMinOffset());
// the consumer returns the offset queue
consumer.get(15, SECONDS);
assertEquals(100, cache.getMinOffset());
}
private MessageHandler<PackageMessage> awaitHeadHandler() {
- return Awaitility.await().ignoreExceptions().until(() -> handlerCaptor.getAllValues().get(1).getHandler(), notNullValue());
+ return Awaitility.await().ignoreExceptions()
+ .until(() -> handlerCaptor.getAllValues().get(1).getHandler(), notNullValue());
}
@Test
@@ -182,8 +192,7 @@ public class PubQueueCacheTest {
// wait that one consumer has started fetching the offsets from 100 to 200
MessageHandler<PackageMessage> headHandler = awaitHeadHandler();
// simulate messages for the fetched offsets
- long fromOffset = offsetFromAssign(headAssignCaptor.getValue());
- simulateMessages(headHandler, fromOffset, cache.getMinOffset());
+ simulateMessages(headHandler, 100, cache.getMinOffset());
// both consumers returns the offset queue
OffsetQueue<DistributionQueueItem> q1 = consumer1.get(5, SECONDS);
OffsetQueue<DistributionQueueItem> q2 = consumer2.get(5, SECONDS);
@@ -205,7 +214,8 @@ public class PubQueueCacheTest {
}
private void simulateMessages(MessageHandler<PackageMessage> handler, long fromOffset, long toOffset) {
- LongStream.rangeClosed(fromOffset, toOffset).forEach(offset -> simulateMessage(handler, offset));
+ LongStream.rangeClosed(fromOffset, toOffset)
+ .forEach(offset -> simulateMessage(handler, offset));
}
private void simulateMessage(MessageHandler<PackageMessage> handler, long offset) {
@@ -242,10 +252,4 @@ public class PubQueueCacheTest {
return c[RAND.nextInt(c.length)];
}
- private Long offsetFromAssign(String assign) {
- String[] chunks = assign.split(":");
- return Long.parseLong(chunks[1]);
- }
-
-
}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
index 846386c..da13e6b 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
@@ -29,7 +29,6 @@ import java.lang.management.ManagementFactory;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Set;
-import java.util.UUID;
import javax.management.AttributeNotFoundException;
import javax.management.InstanceNotFoundException;
@@ -40,7 +39,6 @@ import javax.management.ObjectName;
import javax.management.ReflectionException;
import org.apache.sling.api.resource.PersistenceException;
-import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.MessageInfo;
@@ -51,13 +49,10 @@ import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
-import org.apache.sling.distribution.journal.shared.LocalStore;
import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
-import org.apache.sling.settings.SlingSettingsService;
-import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -80,9 +75,6 @@ public class PubQueueProviderTest {
@Mock
private MessagingProvider clientProvider;
- @Mock
- private SlingSettingsService slingSettings;
-
@Captor
private ArgumentCaptor<HandlerAdapter<PackageMessage>> handlerCaptor;
@@ -101,8 +93,6 @@ public class PubQueueProviderTest {
@Mock
private MessageSender<Object> sender;
- private ResourceResolverFactory resolverFactory = new MockResourceResolverFactory();
-
private PubQueueCacheService pubQueueCacheService;
private MessageHandler<PackageMessage> handler;
@@ -117,7 +107,6 @@ public class PubQueueProviderTest {
when(clientProvider.createPoller(
Mockito.eq(Topics.PACKAGE_TOPIC),
Mockito.any(Reset.class),
- Mockito.anyString(),
handlerCaptor.capture()))
.thenReturn(poller);
when(clientProvider.createPoller(
@@ -128,11 +117,7 @@ public class PubQueueProviderTest {
when(clientProvider.createSender(Mockito.anyString()))
.thenReturn(sender);
Topics topics = new Topics();
- String slingId = UUID.randomUUID().toString();
- when(slingSettings.getSlingId()).thenReturn(slingId);
- LocalStore seedStore = new LocalStore(resolverFactory, "seeds", slingId);
- seedStore.store("offset", 1L);
- pubQueueCacheService = new PubQueueCacheService(clientProvider, topics, eventAdmin, slingSettings, resolverFactory, slingId);
+ pubQueueCacheService = new PubQueueCacheService(clientProvider, topics, eventAdmin);
pubQueueCacheService.activate();
queueProvider = new PubQueueProviderImpl(pubQueueCacheService, clientProvider, topics);
queueProvider.activate();
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
index 2bddbe7..29a5133 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
@@ -18,28 +18,14 @@
*/
package org.apache.sling.distribution.journal.impl.queue.impl;
-import static java.lang.System.currentTimeMillis;
-import static org.apache.sling.distribution.journal.shared.Topics.PACKAGE_TOPIC;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import java.io.Closeable;
import java.io.IOException;
-import java.util.function.LongConsumer;
-import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageSender;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.messages.PackageMessage;
-import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
-import org.apache.sling.distribution.journal.shared.TestMessageInfo;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -47,62 +33,33 @@ import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.runners.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class QueueCacheSeederTest {
- @Mock
- private MessagingProvider clientProvider;
-
- @Captor
- private ArgumentCaptor<HandlerAdapter<PackageMessage>> pkgHandlerCaptor;
-
@Captor
private ArgumentCaptor<PackageMessage> pkgMsgCaptor;
@Mock
- private Closeable poller;
-
- @Mock
private MessageSender<PackageMessage> sender;
- @Mock
- private LongConsumer callback;
-
private QueueCacheSeeder seeder;
@Before
public void before() {
MockitoAnnotations.initMocks(this);
- when(clientProvider.createPoller(
- eq(PACKAGE_TOPIC),
- any(Reset.class),
- pkgHandlerCaptor.capture()))
- .thenReturn(poller);
doNothing().when(sender).send(pkgMsgCaptor.capture());
- when(clientProvider.<PackageMessage>createSender(eq(PACKAGE_TOPIC)))
- .thenReturn(sender);
- seeder = new QueueCacheSeeder(clientProvider, PACKAGE_TOPIC);
- }
-
- @Test
- public void testSeededCallback() throws IOException {
- seeder.seed(callback);
- long offset = 15L;
- simulateSeedingMsg(offset);
- verify(callback).accept(offset);
- verify(poller).close();
+ seeder = new QueueCacheSeeder(sender);
}
@Test
- public void testSendingSeeds() {
- seeder.seed(callback);
- verify(sender, timeout(5000).atLeastOnce()).send(pkgMsgCaptor.capture());
- PackageMessage seedMsg = pkgMsgCaptor.getValue();
- assertNotNull(seedMsg);
- assertEquals(ReqType.TEST, seedMsg.getReqType());
+ public void testSeeding() throws IOException {
+ seeder.startSeeding();
+
+ verify(sender, timeout(1000)).send(Mockito.anyObject());
}
@After
@@ -110,10 +67,4 @@ public class QueueCacheSeederTest {
seeder.close();
}
- private void simulateSeedingMsg(long offset) {
- PackageMessage msg = seeder.createTestMessage();
- pkgHandlerCaptor.getValue().getHandler().handle(
- new TestMessageInfo(PACKAGE_TOPIC, 0, offset, currentTimeMillis()),
- msg);
- }
-}
\ No newline at end of file
+}