You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2020/07/13 12:52:45 UTC

[sling-org-apache-sling-distribution-journal] 03/04: SLING-9583 - Extract messaging code from PubQueueProviderImpl

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch SLING-9583
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit 5df16b72a9c4b8dc86f1d176878dd6ce11d804e3
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Sun Jul 12 11:27:42 2020 +0200

    SLING-9583 - Extract messaging code from PubQueueProviderImpl
---
 .../impl/publisher/DistributionPublisher.java      | 32 +++++++++++++-
 .../impl/queue/{impl => }/ClearCallback.java       |  2 +-
 .../journal/impl/queue/PubQueueProvider.java       |  7 +++-
 .../journal/impl/queue/impl/PubQueue.java          |  1 +
 .../impl/queue/impl/PubQueueProviderImpl.java      | 49 ++--------------------
 .../impl/publisher/DistributionPublisherTest.java  |  3 +-
 .../impl/queue/impl/PubQueueProviderTest.java      | 11 ++---
 7 files changed, 49 insertions(+), 56 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index 2c01d7b..a317290 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -27,6 +27,7 @@ import static org.apache.sling.distribution.DistributionRequestType.DELETE;
 import static org.apache.sling.distribution.DistributionRequestType.TEST;
 import static org.apache.sling.distribution.journal.shared.DistributionMetricsService.timed;
 
+import java.io.Closeable;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Dictionary;
@@ -46,9 +47,12 @@ import javax.management.NotCompliantMBeanException;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
+import org.apache.sling.distribution.journal.impl.queue.ClearCallback;
 import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
 import org.apache.sling.distribution.journal.impl.queue.QueueId;
+import org.apache.sling.distribution.journal.messages.ClearCommand;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.shared.AgentState;
 import org.apache.sling.distribution.journal.shared.DefaultDistributionLog;
 import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
@@ -77,6 +81,8 @@ import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
 import org.osgi.service.metatype.annotations.Designate;
 import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.JournalAvailable;
 
 /**
@@ -136,11 +142,14 @@ public class DistributionPublisher implements DistributionAgent {
     private ServiceRegistration<DistributionAgent> componentReg;
 
     private Consumer<PackageMessage> sender;
+    private Consumer<ClearCommand> commandSender;
 
     private JMXRegistration reg;
 
     private DistributionMetricsService.GaugeService<Integer> subscriberCountGauge;
 
+    private Closeable statusPoller;
+
     public DistributionPublisher() {
         log = new DefaultDistributionLog(pubAgentName, this.getClass(), DefaultDistributionLog.LogLevel.INFO);
         REQ_TYPES.put(ADD,    this::sendAndWait);
@@ -159,6 +168,7 @@ public class DistributionPublisher implements DistributionAgent {
         pkgType = packageBuilder.getType();
 
         this.sender = messagingProvider.createSender(topics.getPackageTopic());
+        this.commandSender = messagingProvider.createSender(topics.getCommandTopic());
         
         Dictionary<String, Object> props = createServiceProps(config);
         componentReg = requireNonNull(context.registerService(DistributionAgent.class, this, props));
@@ -178,11 +188,19 @@ public class DistributionPublisher implements DistributionAgent {
                 "Current number of publish subscribers",
                 () -> discoveryService.getTopologyView().getSubscribedAgentIds().size()
         );
+        
+        statusPoller = messagingProvider.createPoller(
+                topics.getStatusTopic(),
+                Reset.earliest,
+                HandlerAdapter.create(PackageStatusMessage.class, pubQueueProvider::handleStatus)
+                );
+        
         log.info(msg);
     }
 
     @Deactivate
     public void deactivate() {
+        IOUtils.closeQuietly(statusPoller);
         reg.close();
         componentReg.unregister();
         String msg = String.format("Stopped Publisher agent %s with packageBuilder %s, queuedTimeout %s",
@@ -262,10 +280,22 @@ public class DistributionPublisher implements DistributionAgent {
         State state = view.getState(subAgentId.getAgentId(), pubAgentName);
         if (state != null) {
             QueueId queueId = new QueueId(pubAgentName, subAgentId.getSlingId(), subAgentId.getAgentName(), queueName);
-            return pubQueueProvider.getQueue(queueId, state.getOffset() + 1, state.getRetries(), state.isEditable());
+            ClearCallback editableCallback = offset -> sendClearCommand(queueId.getSubSlingId(), queueId.getSubAgentName(), offset);
+            ClearCallback clearCallback = state.isEditable() ? editableCallback : null;
+            return pubQueueProvider.getQueue(queueId, state.getOffset() + 1, state.getRetries(), clearCallback);
         }
         return null;
     }
+    
+    private void sendClearCommand(String subSlingId, String subAgentName, long offset) {
+        ClearCommand commandMessage = ClearCommand.builder()
+                .subSlingId(subSlingId)
+                .subAgentName(subAgentName)
+                .offset(offset)
+                .build();
+        log.info("Sending clear command to subSlingId: {}, subAgentName: {} with offset {}.", subSlingId, subAgentName, offset);
+        commandSender.accept(commandMessage);
+    }
 
     @Nonnull
     @Override
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/ClearCallback.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/ClearCallback.java
similarity index 93%
rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/ClearCallback.java
rename to src/main/java/org/apache/sling/distribution/journal/impl/queue/ClearCallback.java
index 0396b6e..ec10967 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/ClearCallback.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/ClearCallback.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.impl.queue;
 
 import javax.annotation.ParametersAreNonnullByDefault;
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java
index f3bac6b..bc349b9 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java
@@ -19,17 +19,22 @@
 package org.apache.sling.distribution.journal.impl.queue;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import javax.annotation.ParametersAreNonnullByDefault;
 
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 
 @ParametersAreNonnullByDefault
 public interface PubQueueProvider {
 
     @Nonnull
-    DistributionQueue getQueue(QueueId queueId, long minOffset, int headRetries, boolean editable);
+    DistributionQueue getQueue(QueueId queueId, long minOffset, int headRetries, @Nullable ClearCallback clearCallback);
 
     @Nonnull
     DistributionQueue getErrorQueue(QueueId queueId);
 
+    void handleStatus(MessageInfo info, PackageStatusMessage message);
+
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
index 23aaf15..8689a39 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
@@ -39,6 +39,7 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.ParametersAreNonnullByDefault;
 
+import org.apache.sling.distribution.journal.impl.queue.ClearCallback;
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
index f9dcc45..f866d36 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
@@ -18,27 +18,19 @@
  */
 package org.apache.sling.distribution.journal.impl.queue.impl;
 
-import static org.apache.sling.distribution.journal.HandlerAdapter.create;
-
-import java.io.Closeable;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Consumer;
 
 import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.queue.ClearCallback;
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
 import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
 import org.apache.sling.distribution.journal.impl.queue.QueueId;
-import org.apache.sling.distribution.journal.messages.ClearCommand;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
-import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.osgi.service.component.annotations.Activate;
@@ -63,54 +55,31 @@ public class PubQueueProviderImpl implements PubQueueProvider {
     private final Map<String, OffsetQueue<Long>> errorQueues = new ConcurrentHashMap<>();
 
     @Reference
-    private MessagingProvider messagingProvider;
-
-    @Reference
-    private Topics topics;
-
-    @Reference
     private PubQueueCacheService pubQueueCacheService;
 
-    private Closeable statusPoller;
-
-    private Consumer<ClearCommand> sender;
-
     public PubQueueProviderImpl() {
     }
     
     public PubQueueProviderImpl(
-            PubQueueCacheService pubQueueCacheService,
-            MessagingProvider messagingProvider,
-            Topics topics) {
+            PubQueueCacheService pubQueueCacheService) {
         this.pubQueueCacheService = pubQueueCacheService;
-        this.messagingProvider = messagingProvider;
-        this.topics = topics;
     }
 
     @Activate
     public void activate() {
-        statusPoller = messagingProvider.createPoller(
-                topics.getStatusTopic(),
-                Reset.earliest,
-                create(PackageStatusMessage.class, this::handleStatus)
-                );
-        sender = messagingProvider.createSender(topics.getCommandTopic());
         LOG.info("Started Publisher queue provider service");
     }
 
     @Deactivate
     public void deactivate() {
-        IOUtils.closeQuietly(statusPoller);
         LOG.info("Stopped Publisher queue provider service");
     }
 
     @Nonnull
     @Override
-    public DistributionQueue getQueue(QueueId queueId, long minOffset, int headRetries, boolean editable) {
+    public DistributionQueue getQueue(QueueId queueId, long minOffset, int headRetries, ClearCallback clearCallback) {
         OffsetQueue<DistributionQueueItem> agentQueue = pubQueueCacheService.getOffsetQueue(queueId.getPubAgentName(), minOffset);
-        ClearCallback editableCallback = offset -> sendClearCommand(queueId.getSubSlingId(), queueId.getSubAgentName(), offset);
-        ClearCallback callback = editable ? editableCallback : null;
-        return new PubQueue(queueId.getQueueName(), agentQueue.getMinOffsetQueue(minOffset), headRetries, callback);
+        return new PubQueue(queueId.getQueueName(), agentQueue.getMinOffsetQueue(minOffset), headRetries, clearCallback);
     }
 
     @Nonnull
@@ -139,14 +108,4 @@ public class PubQueueProviderImpl implements PubQueueProvider {
         }
     }
 
-    private void sendClearCommand(String subSlingId, String subAgentName, long offset) {
-        ClearCommand commandMessage = ClearCommand.builder()
-                .subSlingId(subSlingId)
-                .subAgentName(subAgentName)
-                .offset(offset)
-                .build();
-        LOG.info("Sending clear command to subSlingId: {}, subAgentName: {} with offset {}.", subSlingId, subAgentName, offset);
-        sender.accept(commandMessage);
-    }
-
 }
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index 0a66ad7..fe8268a 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -54,6 +54,7 @@ import org.apache.sling.distribution.agent.spi.DistributionAgent;
 import org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.impl.queue.ClearCallback;
 import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
@@ -262,7 +263,7 @@ public class DistributionPublisherTest {
         when(topology.getSubscribedAgentIds(PUB1AGENT1)).thenReturn(Collections.singleton(QUEUE_NAME));
         State state = stateWithMaxRetries(1);
         when(topology.getState(QUEUE_NAME, PUB1AGENT1)).thenReturn(state);
-        when(pubQueueProvider.getQueue(Mockito.any(), Mockito.eq(2l), Mockito.eq(0), Mockito.eq(false)))
+        when(pubQueueProvider.getQueue(Mockito.any(), Mockito.eq(2l), Mockito.eq(0), Mockito.isNull(ClearCallback.class)))
             .thenThrow(new RuntimeException("Error"));
 
         Counter counter = new TestCounter();
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
index 00eae27..ea71824 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
@@ -97,7 +97,6 @@ public class PubQueueProviderTest {
     private PubQueueCacheService pubQueueCacheService;
 
     private MessageHandler<PackageMessage> handler;
-    private MessageHandler<PackageStatusMessage> statHandler;
 
     private PubQueueProviderImpl queueProvider;
     private MBeanServer mbeanServer;
@@ -120,10 +119,9 @@ public class PubQueueProviderTest {
         Topics topics = new Topics();
         pubQueueCacheService = new PubQueueCacheService(clientProvider, topics, eventAdmin);
         pubQueueCacheService.activate();
-        queueProvider = new PubQueueProviderImpl(pubQueueCacheService, clientProvider, topics);
+        queueProvider = new PubQueueProviderImpl(pubQueueCacheService);
         queueProvider.activate();
         handler = handlerCaptor.getValue().getHandler();
-        statHandler = statHandlerCaptor.getValue().getHandler();
     }
 
     @After
@@ -131,7 +129,6 @@ public class PubQueueProviderTest {
         pubQueueCacheService.deactivate();
         queueProvider.deactivate();
         verify(poller).close();
-        verify(statPoller).close();
     }
     
     @Test
@@ -142,13 +139,13 @@ public class PubQueueProviderTest {
         
         // Full pub1 queue contains all packages from pub1
         QueueId queueId = new QueueId(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID);
-        DistributionQueue queue = queueProvider.getQueue(queueId, 0, -1, false);
+        DistributionQueue queue = queueProvider.getQueue(queueId, 0, -1, null);
         Iterator<DistributionQueueEntry> it1 = queue.getEntries(0, -1).iterator();
         assertThat(it1.next().getItem().getPackageId(), equalTo("packageid1"));
         assertThat(it1.next().getItem().getPackageId(), equalTo("packageid3"));
         
         // With offset 1 first package is removed
-        DistributionQueue queue2 = queueProvider.getQueue(queueId, 1, -1, false);
+        DistributionQueue queue2 = queueProvider.getQueue(queueId, 1, -1, null);
         Iterator<DistributionQueueEntry> it2 = queue2.getEntries(0, 20).iterator();
         assertThat(it2.next().getItem().getPackageId(), equalTo("packageid3"));
         assertThat(it2.hasNext(), equalTo(false));
@@ -177,7 +174,7 @@ public class PubQueueProviderTest {
         MessageInfo info = info(1L);
         handler.handle(info, pkgMsg1);
         PackageStatusMessage statusMsg1 = statusMessage(info.getOffset(), pkgMsg1);
-        statHandler.handle(info, statusMsg1);
+        queueProvider.handleStatus(info, statusMsg1);
         
         QueueId queueId = new QueueId(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID);
         DistributionQueue queue = queueProvider.getErrorQueue(queueId);