You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2020/07/13 12:52:44 UTC
[sling-org-apache-sling-distribution-journal] 02/04: SLING-9583 -
Combine parameters into value object
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch SLING-9583
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit dac592ab7a227b6b05fd0955d853f67280ca6432
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Fri Jul 10 17:03:18 2020 +0200
SLING-9583 - Combine parameters into value object
---
.../impl/publisher/DistributionPublisher.java | 9 ++--
.../journal/impl/queue/PubQueueProvider.java | 4 +-
.../distribution/journal/impl/queue/QueueId.java | 53 ++++++++++++++++++++++
.../impl/queue/impl/PubQueueProviderImpl.java | 25 +++++-----
.../impl/publisher/DistributionPublisherTest.java | 3 +-
.../impl/queue/impl/PubQueueProviderTest.java | 12 +++--
6 files changed, 80 insertions(+), 26 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index 6b9e77a..2c01d7b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -47,6 +47,7 @@ import javax.management.NotCompliantMBeanException;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.impl.queue.QueueId;
import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.shared.AgentState;
import org.apache.sling.distribution.journal.shared.DefaultDistributionLog;
@@ -250,7 +251,8 @@ public class DistributionPublisher implements DistributionAgent {
@Nonnull
private DistributionQueue getErrorQueue(String queueName) {
AgentId subAgentId = new AgentId(StringUtils.substringBeforeLast(queueName, "-error"));
- return pubQueueProvider.getErrorQueue(pubAgentName, subAgentId.getSlingId(), subAgentId.getAgentName(), queueName);
+ QueueId queueId = new QueueId(pubAgentName, subAgentId.getSlingId(), subAgentId.getAgentName(), queueName);
+ return pubQueueProvider.getErrorQueue(queueId);
}
@CheckForNull
@@ -259,13 +261,12 @@ public class DistributionPublisher implements DistributionAgent {
AgentId subAgentId = new AgentId(queueName);
State state = view.getState(subAgentId.getAgentId(), pubAgentName);
if (state != null) {
- return pubQueueProvider.getQueue(pubAgentName, subAgentId.getSlingId(), subAgentId.getAgentName(), queueName, state.getOffset() + 1, state.getRetries(), state.isEditable());
+ QueueId queueId = new QueueId(pubAgentName, subAgentId.getSlingId(), subAgentId.getAgentName(), queueName);
+ return pubQueueProvider.getQueue(queueId, state.getOffset() + 1, state.getRetries(), state.isEditable());
}
return null;
}
-
-
@Nonnull
@Override
public DistributionLog getLog() {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java
index 178f6f2..f3bac6b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java
@@ -27,9 +27,9 @@ import org.apache.sling.distribution.queue.spi.DistributionQueue;
public interface PubQueueProvider {
@Nonnull
- DistributionQueue getQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName, long minOffset, int headRetries, boolean editable);
+ DistributionQueue getQueue(QueueId queueId, long minOffset, int headRetries, boolean editable);
@Nonnull
- DistributionQueue getErrorQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName);
+ DistributionQueue getErrorQueue(QueueId queueId);
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueId.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueId.java
new file mode 100644
index 0000000..61f33d3
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueId.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.queue;
+
+public class QueueId {
+ private final String pubAgentName;
+ private final String subSlingId;
+ private final String subAgentName;
+ private final String queueName;
+
+ public QueueId(String pubAgentName, String subSlingId, String subAgentName, String queueName) {
+ this.pubAgentName = pubAgentName;
+ this.subSlingId = subSlingId;
+ this.subAgentName = subAgentName;
+ this.queueName = queueName;
+ }
+
+ public String getPubAgentName() {
+ return pubAgentName;
+ }
+
+ public String getSubSlingId() {
+ return subSlingId;
+ }
+
+ public String getSubAgentName() {
+ return subAgentName;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public String getErrorQueueKey() {
+ return String.format("%s#%s#%s", pubAgentName, subSlingId, subAgentName);
+ }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
index c019c09..f9dcc45 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
@@ -34,6 +34,7 @@ import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
+import org.apache.sling.distribution.journal.impl.queue.QueueId;
import org.apache.sling.distribution.journal.messages.ClearCommand;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
@@ -105,17 +106,17 @@ public class PubQueueProviderImpl implements PubQueueProvider {
@Nonnull
@Override
- public DistributionQueue getQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName, long minOffset, int headRetries, boolean editable) {
- OffsetQueue<DistributionQueueItem> agentQueue = pubQueueCacheService.getOffsetQueue(pubAgentName, minOffset);
- ClearCallback editableCallback = offset -> sendClearCommand(subSlingId, subAgentName, offset);
+ public DistributionQueue getQueue(QueueId queueId, long minOffset, int headRetries, boolean editable) {
+ OffsetQueue<DistributionQueueItem> agentQueue = pubQueueCacheService.getOffsetQueue(queueId.getPubAgentName(), minOffset);
+ ClearCallback editableCallback = offset -> sendClearCommand(queueId.getSubSlingId(), queueId.getSubAgentName(), offset);
ClearCallback callback = editable ? editableCallback : null;
- return new PubQueue(queueName, agentQueue.getMinOffsetQueue(minOffset), headRetries, callback);
+ return new PubQueue(queueId.getQueueName(), agentQueue.getMinOffsetQueue(minOffset), headRetries, callback);
}
@Nonnull
@Override
- public DistributionQueue getErrorQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName) {
- String errorQueueKey = errorQueueKey(pubAgentName, subSlingId, subAgentName);
+ public DistributionQueue getErrorQueue(QueueId queueId) {
+ String errorQueueKey = queueId.getErrorQueueKey();
OffsetQueue<Long> errorQueue = errorQueues.getOrDefault(errorQueueKey, new OffsetQueueImpl<>());
long headOffset = errorQueue.getHeadOffset();
final OffsetQueue<DistributionQueueItem> agentQueue;
@@ -123,25 +124,21 @@ public class PubQueueProviderImpl implements PubQueueProvider {
agentQueue = new OffsetQueueImpl<>();
} else {
long minReferencedOffset = errorQueue.getItem(headOffset);
- agentQueue = pubQueueCacheService.getOffsetQueue(pubAgentName, minReferencedOffset);
+ agentQueue = pubQueueCacheService.getOffsetQueue(queueId.getPubAgentName(), minReferencedOffset);
}
- return new PubErrQueue(queueName, agentQueue, errorQueue);
+ return new PubErrQueue(queueId.getQueueName(), agentQueue, errorQueue);
}
public void handleStatus(MessageInfo info, PackageStatusMessage message) {
if (message.getStatus() == Status.REMOVED_FAILED) {
- String errorQueueKey = errorQueueKey(message.getPubAgentName(), message.getSubSlingId(), message.getSubAgentName());
+ QueueId queueId = new QueueId(message.getPubAgentName(), message.getSubSlingId(), message.getSubAgentName(), "");
+ String errorQueueKey = queueId.getErrorQueueKey();
OffsetQueue<Long> errorQueue = errorQueues.computeIfAbsent(errorQueueKey, key -> new OffsetQueueImpl<>());
errorQueue.putItem(info.getOffset(), message.getOffset());
}
}
- @Nonnull
- private String errorQueueKey(String pubAgentName, String subSlingId, String subAgentName) {
- return String.format("%s#%s#%s", pubAgentName, subSlingId, subAgentName);
- }
-
private void sendClearCommand(String subSlingId, String subAgentName, long offset) {
ClearCommand commandMessage = ClearCommand.builder()
.subSlingId(subSlingId)
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index 9f9de96..0a66ad7 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -262,8 +262,7 @@ public class DistributionPublisherTest {
when(topology.getSubscribedAgentIds(PUB1AGENT1)).thenReturn(Collections.singleton(QUEUE_NAME));
State state = stateWithMaxRetries(1);
when(topology.getState(QUEUE_NAME, PUB1AGENT1)).thenReturn(state);
- AgentId subAgentId = new AgentId(QUEUE_NAME);
- when(pubQueueProvider.getQueue(PUB1AGENT1, subAgentId.getSlingId(), subAgentId.getAgentName(), QUEUE_NAME, 2, 0, false))
+ when(pubQueueProvider.getQueue(Mockito.any(), Mockito.eq(2l), Mockito.eq(0), Mockito.eq(false)))
.thenThrow(new RuntimeException("Error"));
Counter counter = new TestCounter();
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
index da13e6b..00eae27 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
@@ -45,6 +45,7 @@ import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.queue.QueueId;
import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
@@ -140,13 +141,14 @@ public class PubQueueProviderTest {
handler.handle(info(2L), packageMessage("packageid3", PUB1_AGENT_NAME));
// Full pub1 queue contains all packages from pub1
- DistributionQueue queue = queueProvider.getQueue(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID, 0, -1, false);
+ QueueId queueId = new QueueId(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID);
+ DistributionQueue queue = queueProvider.getQueue(queueId, 0, -1, false);
Iterator<DistributionQueueEntry> it1 = queue.getEntries(0, -1).iterator();
assertThat(it1.next().getItem().getPackageId(), equalTo("packageid1"));
assertThat(it1.next().getItem().getPackageId(), equalTo("packageid3"));
// With offset 1 first package is removed
- DistributionQueue queue2 = queueProvider.getQueue(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID, 1, -1, false);
+ DistributionQueue queue2 = queueProvider.getQueue(queueId, 1, -1, false);
Iterator<DistributionQueueEntry> it2 = queue2.getEntries(0, 20).iterator();
assertThat(it2.next().getItem().getPackageId(), equalTo("packageid3"));
assertThat(it2.hasNext(), equalTo(false));
@@ -161,7 +163,8 @@ public class PubQueueProviderTest {
@Test
public void testEmptyErrorQueue() throws Exception {
- DistributionQueue queue = queueProvider.getErrorQueue(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID);
+ QueueId queueId = new QueueId(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID);
+ DistributionQueue queue = queueProvider.getErrorQueue(queueId);
assertThat(queue.getStatus().getItemsCount(), equalTo(0));
}
@@ -176,7 +179,8 @@ public class PubQueueProviderTest {
PackageStatusMessage statusMsg1 = statusMessage(info.getOffset(), pkgMsg1);
statHandler.handle(info, statusMsg1);
- DistributionQueue queue = queueProvider.getErrorQueue(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID);
+ QueueId queueId = new QueueId(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID);
+ DistributionQueue queue = queueProvider.getErrorQueue(queueId);
assertThat(queue.getStatus().getItemsCount(), equalTo(1));
DistributionQueueEntry head = queue.getHead();
DistributionQueueItem item = head.getItem();