You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2020/07/13 12:52:44 UTC

[sling-org-apache-sling-distribution-journal] 02/04: SLING-9583 - Combine parameters into value object

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch SLING-9583
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit dac592ab7a227b6b05fd0955d853f67280ca6432
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Fri Jul 10 17:03:18 2020 +0200

    SLING-9583 - Combine parameters into value object
---
 .../impl/publisher/DistributionPublisher.java      |  9 ++--
 .../journal/impl/queue/PubQueueProvider.java       |  4 +-
 .../distribution/journal/impl/queue/QueueId.java   | 53 ++++++++++++++++++++++
 .../impl/queue/impl/PubQueueProviderImpl.java      | 25 +++++-----
 .../impl/publisher/DistributionPublisherTest.java  |  3 +-
 .../impl/queue/impl/PubQueueProviderTest.java      | 12 +++--
 6 files changed, 80 insertions(+), 26 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index 6b9e77a..2c01d7b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -47,6 +47,7 @@ import javax.management.NotCompliantMBeanException;
 import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
 import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.impl.queue.QueueId;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.shared.AgentState;
 import org.apache.sling.distribution.journal.shared.DefaultDistributionLog;
@@ -250,7 +251,8 @@ public class DistributionPublisher implements DistributionAgent {
     @Nonnull
     private DistributionQueue getErrorQueue(String queueName) {
         AgentId subAgentId = new AgentId(StringUtils.substringBeforeLast(queueName, "-error"));
-        return pubQueueProvider.getErrorQueue(pubAgentName, subAgentId.getSlingId(), subAgentId.getAgentName(), queueName);
+        QueueId queueId = new QueueId(pubAgentName, subAgentId.getSlingId(), subAgentId.getAgentName(), queueName);
+        return pubQueueProvider.getErrorQueue(queueId);
     }
 
     @CheckForNull
@@ -259,13 +261,12 @@ public class DistributionPublisher implements DistributionAgent {
         AgentId subAgentId = new AgentId(queueName);
         State state = view.getState(subAgentId.getAgentId(), pubAgentName);
         if (state != null) {
-            return pubQueueProvider.getQueue(pubAgentName, subAgentId.getSlingId(), subAgentId.getAgentName(), queueName, state.getOffset() + 1, state.getRetries(), state.isEditable());
+            QueueId queueId = new QueueId(pubAgentName, subAgentId.getSlingId(), subAgentId.getAgentName(), queueName);
+            return pubQueueProvider.getQueue(queueId, state.getOffset() + 1, state.getRetries(), state.isEditable());
         }
         return null;
     }
 
-
-
     @Nonnull
     @Override
     public DistributionLog getLog() {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java
index 178f6f2..f3bac6b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java
@@ -27,9 +27,9 @@ import org.apache.sling.distribution.queue.spi.DistributionQueue;
 public interface PubQueueProvider {
 
     @Nonnull
-    DistributionQueue getQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName, long minOffset, int headRetries, boolean editable);
+    DistributionQueue getQueue(QueueId queueId, long minOffset, int headRetries, boolean editable);
 
     @Nonnull
-    DistributionQueue getErrorQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName);
+    DistributionQueue getErrorQueue(QueueId queueId);
 
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueId.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueId.java
new file mode 100644
index 0000000..61f33d3
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueId.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.queue;
+
+public class QueueId {
+    private final String pubAgentName;
+    private final String subSlingId;
+    private final String subAgentName;
+    private final String queueName;
+    
+    public QueueId(String pubAgentName, String subSlingId, String subAgentName, String queueName) {
+        this.pubAgentName = pubAgentName;
+        this.subSlingId = subSlingId;
+        this.subAgentName = subAgentName;
+        this.queueName = queueName;
+    }
+    
+    public String getPubAgentName() {
+        return pubAgentName;
+    }
+    
+    public String getSubSlingId() {
+        return subSlingId;
+    }
+    
+    public String getSubAgentName() {
+        return subAgentName;
+    }
+    
+    public String getQueueName() {
+        return queueName;
+    }
+    
+    public String getErrorQueueKey() {
+        return String.format("%s#%s#%s", pubAgentName, subSlingId, subAgentName);
+    }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
index c019c09..f9dcc45 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
@@ -34,6 +34,7 @@ import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
 import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.impl.queue.QueueId;
 import org.apache.sling.distribution.journal.messages.ClearCommand;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
@@ -105,17 +106,17 @@ public class PubQueueProviderImpl implements PubQueueProvider {
 
     @Nonnull
     @Override
-    public DistributionQueue getQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName, long minOffset, int headRetries, boolean editable) {
-        OffsetQueue<DistributionQueueItem> agentQueue = pubQueueCacheService.getOffsetQueue(pubAgentName, minOffset);
-        ClearCallback editableCallback = offset -> sendClearCommand(subSlingId, subAgentName, offset);
+    public DistributionQueue getQueue(QueueId queueId, long minOffset, int headRetries, boolean editable) {
+        OffsetQueue<DistributionQueueItem> agentQueue = pubQueueCacheService.getOffsetQueue(queueId.getPubAgentName(), minOffset);
+        ClearCallback editableCallback = offset -> sendClearCommand(queueId.getSubSlingId(), queueId.getSubAgentName(), offset);
         ClearCallback callback = editable ? editableCallback : null;
-        return new PubQueue(queueName, agentQueue.getMinOffsetQueue(minOffset), headRetries, callback);
+        return new PubQueue(queueId.getQueueName(), agentQueue.getMinOffsetQueue(minOffset), headRetries, callback);
     }
 
     @Nonnull
     @Override
-    public DistributionQueue getErrorQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName) {
-        String errorQueueKey = errorQueueKey(pubAgentName, subSlingId, subAgentName);
+    public DistributionQueue getErrorQueue(QueueId queueId) {
+        String errorQueueKey = queueId.getErrorQueueKey();
         OffsetQueue<Long> errorQueue = errorQueues.getOrDefault(errorQueueKey, new OffsetQueueImpl<>());
         long headOffset = errorQueue.getHeadOffset();
         final OffsetQueue<DistributionQueueItem> agentQueue;
@@ -123,25 +124,21 @@ public class PubQueueProviderImpl implements PubQueueProvider {
             agentQueue = new OffsetQueueImpl<>();
         } else {
             long minReferencedOffset = errorQueue.getItem(headOffset);
-            agentQueue = pubQueueCacheService.getOffsetQueue(pubAgentName, minReferencedOffset);
+            agentQueue = pubQueueCacheService.getOffsetQueue(queueId.getPubAgentName(), minReferencedOffset);
         }
 
-        return new PubErrQueue(queueName, agentQueue, errorQueue);
+        return new PubErrQueue(queueId.getQueueName(), agentQueue, errorQueue);
     }
 
     public void handleStatus(MessageInfo info, PackageStatusMessage message) {
         if (message.getStatus() == Status.REMOVED_FAILED) {
-            String errorQueueKey = errorQueueKey(message.getPubAgentName(), message.getSubSlingId(), message.getSubAgentName());
+            QueueId queueId = new QueueId(message.getPubAgentName(), message.getSubSlingId(), message.getSubAgentName(), "");
+            String errorQueueKey = queueId.getErrorQueueKey();
             OffsetQueue<Long> errorQueue = errorQueues.computeIfAbsent(errorQueueKey, key -> new OffsetQueueImpl<>());
             errorQueue.putItem(info.getOffset(), message.getOffset());
         }
     }
 
-    @Nonnull
-    private String errorQueueKey(String pubAgentName, String subSlingId, String subAgentName) {
-        return String.format("%s#%s#%s", pubAgentName, subSlingId, subAgentName);
-    }
-
     private void sendClearCommand(String subSlingId, String subAgentName, long offset) {
         ClearCommand commandMessage = ClearCommand.builder()
                 .subSlingId(subSlingId)
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index 9f9de96..0a66ad7 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -262,8 +262,7 @@ public class DistributionPublisherTest {
         when(topology.getSubscribedAgentIds(PUB1AGENT1)).thenReturn(Collections.singleton(QUEUE_NAME));
         State state = stateWithMaxRetries(1);
         when(topology.getState(QUEUE_NAME, PUB1AGENT1)).thenReturn(state);
-        AgentId subAgentId = new AgentId(QUEUE_NAME);
-        when(pubQueueProvider.getQueue(PUB1AGENT1, subAgentId.getSlingId(), subAgentId.getAgentName(), QUEUE_NAME, 2, 0, false))
+        when(pubQueueProvider.getQueue(Mockito.any(), Mockito.eq(2l), Mockito.eq(0), Mockito.eq(false)))
             .thenThrow(new RuntimeException("Error"));
 
         Counter counter = new TestCounter();
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
index da13e6b..00eae27 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
@@ -45,6 +45,7 @@ import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.queue.QueueId;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
@@ -140,13 +141,14 @@ public class PubQueueProviderTest {
         handler.handle(info(2L), packageMessage("packageid3", PUB1_AGENT_NAME));
         
         // Full pub1 queue contains all packages from pub1
-        DistributionQueue queue = queueProvider.getQueue(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID, 0, -1, false);
+        QueueId queueId = new QueueId(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID);
+        DistributionQueue queue = queueProvider.getQueue(queueId, 0, -1, false);
         Iterator<DistributionQueueEntry> it1 = queue.getEntries(0, -1).iterator();
         assertThat(it1.next().getItem().getPackageId(), equalTo("packageid1"));
         assertThat(it1.next().getItem().getPackageId(), equalTo("packageid3"));
         
         // With offset 1 first package is removed
-        DistributionQueue queue2 = queueProvider.getQueue(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID, 1, -1, false);
+        DistributionQueue queue2 = queueProvider.getQueue(queueId, 1, -1, false);
         Iterator<DistributionQueueEntry> it2 = queue2.getEntries(0, 20).iterator();
         assertThat(it2.next().getItem().getPackageId(), equalTo("packageid3"));
         assertThat(it2.hasNext(), equalTo(false));
@@ -161,7 +163,8 @@ public class PubQueueProviderTest {
     
     @Test
     public void testEmptyErrorQueue() throws Exception {
-        DistributionQueue queue = queueProvider.getErrorQueue(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID);
+        QueueId queueId = new QueueId(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID);
+        DistributionQueue queue = queueProvider.getErrorQueue(queueId);
         assertThat(queue.getStatus().getItemsCount(), equalTo(0));
     }
     
@@ -176,7 +179,8 @@ public class PubQueueProviderTest {
         PackageStatusMessage statusMsg1 = statusMessage(info.getOffset(), pkgMsg1);
         statHandler.handle(info, statusMsg1);
         
-        DistributionQueue queue = queueProvider.getErrorQueue(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID);
+        QueueId queueId = new QueueId(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID);
+        DistributionQueue queue = queueProvider.getErrorQueue(queueId);
         assertThat(queue.getStatus().getItemsCount(), equalTo(1));
         DistributionQueueEntry head = queue.getHead();
         DistributionQueueItem item = head.getItem();