You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2020/07/13 12:52:45 UTC
[sling-org-apache-sling-distribution-journal] 03/04: SLING-9583 -
Extract messaging code from PubQueueProviderImpl
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch SLING-9583
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit 5df16b72a9c4b8dc86f1d176878dd6ce11d804e3
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Sun Jul 12 11:27:42 2020 +0200
SLING-9583 - Extract messaging code from PubQueueProviderImpl
---
.../impl/publisher/DistributionPublisher.java | 32 +++++++++++++-
.../impl/queue/{impl => }/ClearCallback.java | 2 +-
.../journal/impl/queue/PubQueueProvider.java | 7 +++-
.../journal/impl/queue/impl/PubQueue.java | 1 +
.../impl/queue/impl/PubQueueProviderImpl.java | 49 ++--------------------
.../impl/publisher/DistributionPublisherTest.java | 3 +-
.../impl/queue/impl/PubQueueProviderTest.java | 11 ++---
7 files changed, 49 insertions(+), 56 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index 2c01d7b..a317290 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -27,6 +27,7 @@ import static org.apache.sling.distribution.DistributionRequestType.DELETE;
import static org.apache.sling.distribution.DistributionRequestType.TEST;
import static org.apache.sling.distribution.journal.shared.DistributionMetricsService.timed;
+import java.io.Closeable;
import java.util.Arrays;
import java.util.Collections;
import java.util.Dictionary;
@@ -46,9 +47,12 @@ import javax.management.NotCompliantMBeanException;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
+import org.apache.sling.distribution.journal.impl.queue.ClearCallback;
import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
import org.apache.sling.distribution.journal.impl.queue.QueueId;
+import org.apache.sling.distribution.journal.messages.ClearCommand;
import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
import org.apache.sling.distribution.journal.shared.AgentState;
import org.apache.sling.distribution.journal.shared.DefaultDistributionLog;
import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
@@ -77,6 +81,8 @@ import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.metatype.annotations.Designate;
import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.JournalAvailable;
/**
@@ -136,11 +142,14 @@ public class DistributionPublisher implements DistributionAgent {
private ServiceRegistration<DistributionAgent> componentReg;
private Consumer<PackageMessage> sender;
+ private Consumer<ClearCommand> commandSender;
private JMXRegistration reg;
private DistributionMetricsService.GaugeService<Integer> subscriberCountGauge;
+ private Closeable statusPoller;
+
public DistributionPublisher() {
log = new DefaultDistributionLog(pubAgentName, this.getClass(), DefaultDistributionLog.LogLevel.INFO);
REQ_TYPES.put(ADD, this::sendAndWait);
@@ -159,6 +168,7 @@ public class DistributionPublisher implements DistributionAgent {
pkgType = packageBuilder.getType();
this.sender = messagingProvider.createSender(topics.getPackageTopic());
+ this.commandSender = messagingProvider.createSender(topics.getCommandTopic());
Dictionary<String, Object> props = createServiceProps(config);
componentReg = requireNonNull(context.registerService(DistributionAgent.class, this, props));
@@ -178,11 +188,19 @@ public class DistributionPublisher implements DistributionAgent {
"Current number of publish subscribers",
() -> discoveryService.getTopologyView().getSubscribedAgentIds().size()
);
+
+ statusPoller = messagingProvider.createPoller(
+ topics.getStatusTopic(),
+ Reset.earliest,
+ HandlerAdapter.create(PackageStatusMessage.class, pubQueueProvider::handleStatus)
+ );
+
log.info(msg);
}
@Deactivate
public void deactivate() {
+ IOUtils.closeQuietly(statusPoller);
reg.close();
componentReg.unregister();
String msg = String.format("Stopped Publisher agent %s with packageBuilder %s, queuedTimeout %s",
@@ -262,10 +280,22 @@ public class DistributionPublisher implements DistributionAgent {
State state = view.getState(subAgentId.getAgentId(), pubAgentName);
if (state != null) {
QueueId queueId = new QueueId(pubAgentName, subAgentId.getSlingId(), subAgentId.getAgentName(), queueName);
- return pubQueueProvider.getQueue(queueId, state.getOffset() + 1, state.getRetries(), state.isEditable());
+ ClearCallback editableCallback = offset -> sendClearCommand(queueId.getSubSlingId(), queueId.getSubAgentName(), offset);
+ ClearCallback clearCallback = state.isEditable() ? editableCallback : null;
+ return pubQueueProvider.getQueue(queueId, state.getOffset() + 1, state.getRetries(), clearCallback);
}
return null;
}
+
+ private void sendClearCommand(String subSlingId, String subAgentName, long offset) {
+ ClearCommand commandMessage = ClearCommand.builder()
+ .subSlingId(subSlingId)
+ .subAgentName(subAgentName)
+ .offset(offset)
+ .build();
+ log.info("Sending clear command to subSlingId: {}, subAgentName: {} with offset {}.", subSlingId, subAgentName, offset);
+ commandSender.accept(commandMessage);
+ }
@Nonnull
@Override
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/ClearCallback.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/ClearCallback.java
similarity index 93%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/ClearCallback.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/queue/ClearCallback.java
index 0396b6e..ec10967 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/ClearCallback.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/ClearCallback.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.impl.queue;
import javax.annotation.ParametersAreNonnullByDefault;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java
index f3bac6b..bc349b9 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java
@@ -19,17 +19,22 @@
package org.apache.sling.distribution.journal.impl.queue;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
@ParametersAreNonnullByDefault
public interface PubQueueProvider {
@Nonnull
- DistributionQueue getQueue(QueueId queueId, long minOffset, int headRetries, boolean editable);
+ DistributionQueue getQueue(QueueId queueId, long minOffset, int headRetries, @Nullable ClearCallback clearCallback);
@Nonnull
DistributionQueue getErrorQueue(QueueId queueId);
+ void handleStatus(MessageInfo info, PackageStatusMessage message);
+
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
index 23aaf15..8689a39 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
@@ -39,6 +39,7 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
+import org.apache.sling.distribution.journal.impl.queue.ClearCallback;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
index f9dcc45..f866d36 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
@@ -18,27 +18,19 @@
*/
package org.apache.sling.distribution.journal.impl.queue.impl;
-import static org.apache.sling.distribution.journal.HandlerAdapter.create;
-
-import java.io.Closeable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Consumer;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
-import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.queue.ClearCallback;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
import org.apache.sling.distribution.journal.impl.queue.QueueId;
-import org.apache.sling.distribution.journal.messages.ClearCommand;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
-import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.osgi.service.component.annotations.Activate;
@@ -63,54 +55,31 @@ public class PubQueueProviderImpl implements PubQueueProvider {
private final Map<String, OffsetQueue<Long>> errorQueues = new ConcurrentHashMap<>();
@Reference
- private MessagingProvider messagingProvider;
-
- @Reference
- private Topics topics;
-
- @Reference
private PubQueueCacheService pubQueueCacheService;
- private Closeable statusPoller;
-
- private Consumer<ClearCommand> sender;
-
public PubQueueProviderImpl() {
}
public PubQueueProviderImpl(
- PubQueueCacheService pubQueueCacheService,
- MessagingProvider messagingProvider,
- Topics topics) {
+ PubQueueCacheService pubQueueCacheService) {
this.pubQueueCacheService = pubQueueCacheService;
- this.messagingProvider = messagingProvider;
- this.topics = topics;
}
@Activate
public void activate() {
- statusPoller = messagingProvider.createPoller(
- topics.getStatusTopic(),
- Reset.earliest,
- create(PackageStatusMessage.class, this::handleStatus)
- );
- sender = messagingProvider.createSender(topics.getCommandTopic());
LOG.info("Started Publisher queue provider service");
}
@Deactivate
public void deactivate() {
- IOUtils.closeQuietly(statusPoller);
LOG.info("Stopped Publisher queue provider service");
}
@Nonnull
@Override
- public DistributionQueue getQueue(QueueId queueId, long minOffset, int headRetries, boolean editable) {
+ public DistributionQueue getQueue(QueueId queueId, long minOffset, int headRetries, ClearCallback clearCallback) {
OffsetQueue<DistributionQueueItem> agentQueue = pubQueueCacheService.getOffsetQueue(queueId.getPubAgentName(), minOffset);
- ClearCallback editableCallback = offset -> sendClearCommand(queueId.getSubSlingId(), queueId.getSubAgentName(), offset);
- ClearCallback callback = editable ? editableCallback : null;
- return new PubQueue(queueId.getQueueName(), agentQueue.getMinOffsetQueue(minOffset), headRetries, callback);
+ return new PubQueue(queueId.getQueueName(), agentQueue.getMinOffsetQueue(minOffset), headRetries, clearCallback);
}
@Nonnull
@@ -139,14 +108,4 @@ public class PubQueueProviderImpl implements PubQueueProvider {
}
}
- private void sendClearCommand(String subSlingId, String subAgentName, long offset) {
- ClearCommand commandMessage = ClearCommand.builder()
- .subSlingId(subSlingId)
- .subAgentName(subAgentName)
- .offset(offset)
- .build();
- LOG.info("Sending clear command to subSlingId: {}, subAgentName: {} with offset {}.", subSlingId, subAgentName, offset);
- sender.accept(commandMessage);
- }
-
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index 0a66ad7..fe8268a 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -54,6 +54,7 @@ import org.apache.sling.distribution.agent.spi.DistributionAgent;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.impl.queue.ClearCallback;
import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
@@ -262,7 +263,7 @@ public class DistributionPublisherTest {
when(topology.getSubscribedAgentIds(PUB1AGENT1)).thenReturn(Collections.singleton(QUEUE_NAME));
State state = stateWithMaxRetries(1);
when(topology.getState(QUEUE_NAME, PUB1AGENT1)).thenReturn(state);
- when(pubQueueProvider.getQueue(Mockito.any(), Mockito.eq(2l), Mockito.eq(0), Mockito.eq(false)))
+ when(pubQueueProvider.getQueue(Mockito.any(), Mockito.eq(2l), Mockito.eq(0), Mockito.isNull(ClearCallback.class)))
.thenThrow(new RuntimeException("Error"));
Counter counter = new TestCounter();
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
index 00eae27..ea71824 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
@@ -97,7 +97,6 @@ public class PubQueueProviderTest {
private PubQueueCacheService pubQueueCacheService;
private MessageHandler<PackageMessage> handler;
- private MessageHandler<PackageStatusMessage> statHandler;
private PubQueueProviderImpl queueProvider;
private MBeanServer mbeanServer;
@@ -120,10 +119,9 @@ public class PubQueueProviderTest {
Topics topics = new Topics();
pubQueueCacheService = new PubQueueCacheService(clientProvider, topics, eventAdmin);
pubQueueCacheService.activate();
- queueProvider = new PubQueueProviderImpl(pubQueueCacheService, clientProvider, topics);
+ queueProvider = new PubQueueProviderImpl(pubQueueCacheService);
queueProvider.activate();
handler = handlerCaptor.getValue().getHandler();
- statHandler = statHandlerCaptor.getValue().getHandler();
}
@After
@@ -131,7 +129,6 @@ public class PubQueueProviderTest {
pubQueueCacheService.deactivate();
queueProvider.deactivate();
verify(poller).close();
- verify(statPoller).close();
}
@Test
@@ -142,13 +139,13 @@ public class PubQueueProviderTest {
// Full pub1 queue contains all packages from pub1
QueueId queueId = new QueueId(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID);
- DistributionQueue queue = queueProvider.getQueue(queueId, 0, -1, false);
+ DistributionQueue queue = queueProvider.getQueue(queueId, 0, -1, null);
Iterator<DistributionQueueEntry> it1 = queue.getEntries(0, -1).iterator();
assertThat(it1.next().getItem().getPackageId(), equalTo("packageid1"));
assertThat(it1.next().getItem().getPackageId(), equalTo("packageid3"));
// With offset 1 first package is removed
- DistributionQueue queue2 = queueProvider.getQueue(queueId, 1, -1, false);
+ DistributionQueue queue2 = queueProvider.getQueue(queueId, 1, -1, null);
Iterator<DistributionQueueEntry> it2 = queue2.getEntries(0, 20).iterator();
assertThat(it2.next().getItem().getPackageId(), equalTo("packageid3"));
assertThat(it2.hasNext(), equalTo(false));
@@ -177,7 +174,7 @@ public class PubQueueProviderTest {
MessageInfo info = info(1L);
handler.handle(info, pkgMsg1);
PackageStatusMessage statusMsg1 = statusMessage(info.getOffset(), pkgMsg1);
- statHandler.handle(info, statusMsg1);
+ queueProvider.handleStatus(info, statusMsg1);
QueueId queueId = new QueueId(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID);
DistributionQueue queue = queueProvider.getErrorQueue(queueId);