You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2019/12/25 18:50:22 UTC
[sling-org-apache-sling-distribution-journal] branch master
updated: SLING-8932 - Fix issues from sonar report (#21)
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new b5b51f0 SLING-8932 - Fix issues from sonar report (#21)
b5b51f0 is described below
commit b5b51f02a4a6cd0c75c41e517edcaf75c6d1566d
Author: Christian Schneider <ch...@die-schneider.net>
AuthorDate: Wed Dec 25 19:50:14 2019 +0100
SLING-8932 - Fix issues from sonar report (#21)
---
.../journal/impl/event/DistributionEvent.java | 10 +++---
.../journal/impl/publisher/DistPublisherJMX.java | 12 ++++---
.../impl/publisher/PackageMessageFactory.java | 2 +-
.../journal/impl/publisher/PackageRepo.java | 12 ++++---
.../journal/impl/queue/QueueItemFactory.java | 21 +++++++-----
.../journal/impl/queue/impl/EntryUtil.java | 3 ++
.../journal/impl/queue/impl/PubErrQueue.java | 11 +++---
.../journal/impl/queue/impl/PubQueue.java | 10 +++---
.../journal/impl/queue/impl/PubQueueCache.java | 2 +-
.../impl/queue/impl/PubQueueProviderImpl.java | 31 ++++++++---------
.../journal/impl/queue/impl/QueueEntryFactory.java | 26 +++++++--------
.../journal/impl/queue/impl/RangePoller.java | 2 +-
.../journal/impl/queue/impl/SubQueue.java | 12 ++++---
.../journal/impl/shared/AgentState.java | 5 ++-
.../impl/shared/DistributionMetricsService.java | 2 +-
.../distribution/journal/impl/shared/Topics.java | 4 +--
.../journal/impl/subscriber/Announcer.java | 39 +++++++++++-----------
.../journal/impl/subscriber/BookKeeper.java | 28 +++++++++-------
.../journal/impl/subscriber/CommandPoller.java | 13 ++++----
.../impl/subscriber/DistributionSubscriber.java | 4 ++-
.../journal/impl/subscriber/PackageHandler.java | 2 +-
21 files changed, 136 insertions(+), 115 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/event/DistributionEvent.java b/src/main/java/org/apache/sling/distribution/journal/impl/event/DistributionEvent.java
index 77cf31c..5811357 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/event/DistributionEvent.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/event/DistributionEvent.java
@@ -45,24 +45,26 @@ import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
public class DistributionEvent {
public static final String PACKAGE_ID = "distribution.package.id";
+ private static final String KIND_AGENT = "agent";
+ private static final String KIND_IMPORTER = "importer";
public static Event eventImporterImported(Messages.PackageMessage pkgMsg, String agentName) {
- return buildEvent(IMPORTER_PACKAGE_IMPORTED, "importer", agentName, pkgMsg);
+ return buildEvent(IMPORTER_PACKAGE_IMPORTED, KIND_IMPORTER, agentName, pkgMsg);
}
public static Event eventPackageCreated(Messages.PackageMessage pkgMsg, String agentName) {
- return buildEvent(AGENT_PACKAGE_CREATED, "agent", agentName, pkgMsg);
+ return buildEvent(AGENT_PACKAGE_CREATED, KIND_AGENT, agentName, pkgMsg);
}
public static Event eventPackageDistributed(DistributionQueueItem queueItem, String agentName) {
- return buildEvent(AGENT_PACKAGE_DISTRIBUTED, "agent", agentName,
+ return buildEvent(AGENT_PACKAGE_DISTRIBUTED, KIND_AGENT, agentName,
queueItem.get(PROPERTY_PACKAGE_TYPE, String.class),
queueItem.get(PROPERTY_REQUEST_PATHS, String[].class),
queueItem.getPackageId());
}
public static Event eventPackageQueued(Messages.PackageMessage pkgMsg, String agentName) {
- return buildEvent(AGENT_PACKAGE_QUEUED, "agent",agentName, pkgMsg);
+ return buildEvent(AGENT_PACKAGE_QUEUED, KIND_AGENT, agentName, pkgMsg);
}
private static Event buildEvent(String topic, String kind, String agentName, PackageMessage pkgMsg) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMX.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMX.java
index e471ee6..9440ffe 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMX.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMX.java
@@ -38,6 +38,8 @@ import org.apache.sling.distribution.queue.spi.DistributionQueue;
class DistPublisherJMX extends StandardMBean implements DistPublisherJMXMBean {
+ private static final String COL_ID = "ID";
+ private static final String COL_OFFSET = "offset";
private String pubAgentName;
private DiscoveryService discoveryService;
private DistributionPublisher distPublisher;
@@ -55,10 +57,10 @@ class DistPublisherJMX extends StandardMBean implements DistPublisherJMXMBean {
@Override
public TabularData getOffsetPerSubscriber() throws MBeanException {
try {
- String[] itemNames = new String[] {"ID", "offset"};
+ String[] itemNames = new String[] {COL_ID, COL_OFFSET};
OpenType<?>[] itemTypes = new OpenType[]{SimpleType.STRING, SimpleType.LONG};
CompositeType rowType = new CompositeType("Offsets", "Offsets by sub agent", itemNames, itemNames, itemTypes);
- TabularType type = new TabularType("type", "desc", rowType, new String[] {"ID"});
+ TabularType type = new TabularType("type", "desc", rowType, new String[] {COL_ID});
TabularDataSupport table = new TabularDataSupport(type);
Set<State> subscribedAgents = discoveryService.getTopologyView().getSubscribedAgents(pubAgentName);
for (State state : subscribedAgents) {
@@ -77,16 +79,16 @@ class DistPublisherJMX extends StandardMBean implements DistPublisherJMXMBean {
@Override
public TabularData getQueue(String queueName) throws MBeanException {
try {
- String[] itemNames = new String[] {"ID", "offset"};
+ String[] itemNames = new String[] {COL_ID, COL_OFFSET};
OpenType<?>[] itemTypes = new OpenType[]{SimpleType.STRING, SimpleType.LONG};
CompositeType rowType = new CompositeType("Offsets", "Queue Offsets", itemNames, itemNames, itemTypes);
- TabularType type = new TabularType("type", "desc", rowType, new String[] {"ID"});
+ TabularType type = new TabularType("type", "desc", rowType, new String[] {COL_ID});
TabularDataSupport table = new TabularDataSupport(type);
DistributionQueue queue = distPublisher.getQueue(queueName);
if (queue != null) {
for (DistributionQueueEntry item : queue.getEntries(0, 1000)) {
CompositeData row = new CompositeDataSupport(rowType, itemNames,
- new Object[] { item.getId(), item.getItem().get("offset")});
+ new Object[] { item.getId(), item.getItem().get(COL_OFFSET)});
table.put(row);
}
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
index fcd3869..2e7d65e 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
@@ -129,7 +129,7 @@ public class PackageMessageFactory {
* always the case.
*/
- LOG.info(String.format("Package %s too large (%sB) to be sent inline", disPkg.getId(), pkgLength));
+ LOG.info("Package {} too large ({}B) to be sent inline", disPkg.getId(), pkgLength);
String pkgBinRef = packageRepo.store(resourceResolver, disPkg);
pkgBuilder.setPkgBinaryRef(pkgBinRef);
} else {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
index 0ab1e72..e87a960 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
@@ -59,6 +59,8 @@ import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
@ParametersAreNonnullByDefault
public class PackageRepo {
+ private static final String SLING_FOLDER = "sling:Folder";
+
@Reference
private ResourceResolverFactory resolverFactory;
@@ -84,7 +86,7 @@ public class PackageRepo {
try {
String pkgPath = String.format(PACKAGE_PATH_PATTERN, disPkg.getType(), disPkg.getId());
Resource pkgResource = ResourceUtil.getOrCreateResource(resolver,
- pkgPath, "sling:Folder", "sling:Folder", false);
+ pkgPath, SLING_FOLDER, SLING_FOLDER, false);
Node pkgNode = pkgResource.adaptTo(Node.class);
Node binNode = JcrUtils.getOrAddNode(pkgNode, "bin", NodeType.NT_FILE);
Node cntNode = JcrUtils.getOrAddNode(binNode, Node.JCR_CONTENT, NodeType.NT_RESOURCE);
@@ -161,14 +163,14 @@ public class PackageRepo {
throws PersistenceException {
long offset = pkg.getValueMap().get("offset", -1);
if (offset < 0) {
- LOG.info(String.format("keep package %s, setting tail offset %s", pkg.getName(), tailOffset));
+ LOG.info("keep package {}, setting tail offset {}", pkg.getName(), tailOffset);
pkg.adaptTo(ModifiableValueMap.class).put("offset", tailOffset);
} else if (offset < headOffset) {
- LOG.info(String.format("remove package %s, offset smaller than head offset %s < %s", pkg.getName(), offset, headOffset));
+ LOG.info("remove package {}, offset smaller than head offset {} < {}", pkg.getName(), offset, headOffset);
resolver.delete(pkg);
return 1;
} else {
- LOG.debug(String.format("keep package %s, offset bigger or equal to head offset %s >= %s", pkg.getName(), offset, headOffset));
+ LOG.debug("keep package {}, offset bigger or equal to head offset {} >= {}", pkg.getName(), offset, headOffset);
}
return 0;
}
@@ -176,6 +178,6 @@ public class PackageRepo {
@Nonnull
private Resource getRoot(ResourceResolver resolver)
throws PersistenceException {
- return ResourceUtil.getOrCreateResource(resolver, PACKAGES_ROOT_PATH, "sling:Folder", "sling:Folder", true);
+ return ResourceUtil.getOrCreateResource(resolver, PACKAGES_ROOT_PATH, SLING_FOLDER, SLING_FOLDER, true);
}
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactory.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactory.java
index 5296b62..c9808d8 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactory.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactory.java
@@ -52,6 +52,9 @@ public final class QueueItemFactory {
public static final String PACKAGE_MSG = "packageMessage";
private static final String REQUEST_USER_ID = "internal.request.user";
+
+ private QueueItemFactory() {
+ }
public static DistributionQueueItem fromPackage(FullMessage<PackageMessage> fMessage) {
return fromPackage(fMessage.getInfo(), fMessage.getMessage(), false);
@@ -85,17 +88,17 @@ public final class QueueItemFactory {
}
private static DistributionRequestType toDistReqType(ReqType reqType) {
- switch (reqType) {
- case ADD:
- return DistributionRequestType.ADD;
- case DELETE:
- return DistributionRequestType.DELETE;
+ switch (reqType) {
+ case ADD:
+ return DistributionRequestType.ADD;
+ case DELETE:
+ return DistributionRequestType.DELETE;
case TEST:
return DistributionRequestType.TEST;
- default:
- throw new IllegalArgumentException("Unhandled DistributionRequestType: " + reqType.name());
- }
- }
+ default:
+ throw new IllegalArgumentException("Unhandled DistributionRequestType: " + reqType.name());
+ }
+ }
@Nonnull
private static String[] toArray(List<String> paths) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java
index 9ef3694..529fc52 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java
@@ -22,6 +22,9 @@ import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
import org.apache.sling.distribution.queue.DistributionQueueItem;
public final class EntryUtil {
+
+ private EntryUtil() {
+ }
public static long entryOffset(String entryId) {
String[] chunks = entryId.split("@");
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
index f6ed1cf..aaa7b1c 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
@@ -40,6 +40,7 @@ import static java.util.Objects.requireNonNull;
@ParametersAreNonnullByDefault
public class PubErrQueue implements DistributionQueue {
+ private static final String UNSUPPORTED_CLEAR_OPERATION = "Unsupported clear operation";
private static final Logger LOG = LoggerFactory.getLogger(PubErrQueue.class);
private final OffsetQueue<DistributionQueueItem> agentQueue;
@@ -54,7 +55,7 @@ public class PubErrQueue implements DistributionQueue {
this.queueName = requireNonNull(queueName);
this.agentQueue = requireNonNull(agentQueue);
this.errorQueue = requireNonNull(errorQueue);
- this.entryFactory = new QueueEntryFactory(queueName, (queueItem) -> 0);
+ this.entryFactory = new QueueEntryFactory(queueName, queueItem -> 0);
}
@Nonnull
@@ -87,7 +88,7 @@ public class PubErrQueue implements DistributionQueue {
if (queueItem != null) {
entries.add(entryFactory.create(queueItem));
} else {
- LOG.warn(String.format("queueItem at offset %s not found", refOffset));
+ LOG.warn("queueItem at offset {} not found", refOffset);
}
}
return entries;
@@ -103,19 +104,19 @@ public class PubErrQueue implements DistributionQueue {
@Override
public DistributionQueueEntry remove(@Nonnull String entryId) {
- throw new UnsupportedOperationException("Unsupported clear operation");
+ throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
}
@Nonnull
@Override
public Iterable<DistributionQueueEntry> remove(Set<String> entryIds) {
- throw new UnsupportedOperationException("Unsupported clear operation");
+ throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
}
@Nonnull
@Override
public Iterable<DistributionQueueEntry> clear(int limit) {
- throw new UnsupportedOperationException("Unsupported clear operation");
+ throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
}
@Nonnull
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
index b6874c5..93f0dca 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
@@ -103,8 +103,8 @@ public class PubQueue implements DistributionQueue {
@Override
public DistributionQueueEntry getHead() {
- DistributionQueueItem headItem = offsetQueue.getHeadItem();
- return entryFactory.create(headItem);
+ DistributionQueueItem queueItem = offsetQueue.getHeadItem();
+ return entryFactory.create(queueItem);
}
@Nonnull
@@ -209,9 +209,9 @@ public class PubQueue implements DistributionQueue {
return capabilities.contains(capability);
}
- private Integer attempts(DistributionQueueItem queueItem) {
- return queueItem.equals(headItem) ? retries : 0;
- }
+ private int attempts(DistributionQueueItem queueItem) {
+ return queueItem.equals(headItem) ? retries : 0;
+ }
private Iterable<DistributionQueueEntry> clear(String tailEntryId) {
List<DistributionQueueEntry> removed = new ArrayList<>();
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index 438c765..75d71fc 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -209,7 +209,7 @@ public class PubQueueCache {
if (requestedMinOffset < cachedMinOffset) {
- LOG.debug(String.format("Requested min offset %s smaller than cached min offset %s", requestedMinOffset, cachedMinOffset));
+ LOG.debug("Requested min offset {} smaller than cached min offset {}", requestedMinOffset, cachedMinOffset);
// Fetching data from a topic is a costly
// operation. In most cases, we expect the queues
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
index 22e3228..8b253b7 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
@@ -108,7 +108,8 @@ public class PubQueueProviderImpl implements PubQueueProvider {
@Override
public DistributionQueue getQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName, long minOffset, int headRetries, boolean editable) {
OffsetQueue<DistributionQueueItem> agentQueue = pubQueueCacheService.getOffsetQueue(pubAgentName, minOffset);
- ClearCallback callback = editable ? editableCallback(subSlingId, subAgentName) : null;
+ ClearCallback editableCallback = offset -> sendClearCommand(subSlingId, subAgentName, minOffset);
+ ClearCallback callback = editable ? editableCallback : null;
return new PubQueue(queueName, agentQueue.getMinOffsetQueue(minOffset), headRetries, callback);
}
@@ -132,11 +133,7 @@ public class PubQueueProviderImpl implements PubQueueProvider {
public void handleStatus(MessageInfo info, PackageStatusMessage message) {
if (message.getStatus() == REMOVED_FAILED) {
String errorQueueKey = errorQueueKey(message.getPubAgentName(), message.getSubSlingId(), message.getSubAgentName());
- OffsetQueue<Long> errorQueue = errorQueues.get(errorQueueKey);
- if (errorQueue == null) {
- errorQueue = new OffsetQueueImpl<>();
- errorQueues.put(errorQueueKey, errorQueue);
- }
+ OffsetQueue<Long> errorQueue = errorQueues.computeIfAbsent(errorQueueKey, key -> new OffsetQueueImpl<>());
errorQueue.putItem(info.getOffset(), message.getOffset());
}
}
@@ -146,18 +143,16 @@ public class PubQueueProviderImpl implements PubQueueProvider {
return String.format("%s#%s#%s", pubAgentName, subSlingId, subAgentName);
}
- private ClearCallback editableCallback(String subSlingId, String subAgentName) {
- return (offset) -> {
- Messages.ClearCommand clearCommand = Messages.ClearCommand.newBuilder()
- .setOffset(offset)
- .build();
- CommandMessage commandMessage = CommandMessage.newBuilder()
- .setSubSlingId(subSlingId)
- .setSubAgentName(subAgentName)
- .setClearCommand(clearCommand)
- .build();
- sender.send(topics.getCommandTopic(), commandMessage);
- };
+ private void sendClearCommand(String subSlingId, String subAgentName, long offset) {
+ Messages.ClearCommand clearCommand = Messages.ClearCommand.newBuilder()
+ .setOffset(offset)
+ .build();
+ CommandMessage commandMessage = CommandMessage.newBuilder()
+ .setSubSlingId(subSlingId)
+ .setSubAgentName(subAgentName)
+ .setClearCommand(clearCommand)
+ .build();
+ sender.send(topics.getCommandTopic(), commandMessage);
}
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java
index 89f2b2a..47a9f25 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java
@@ -22,7 +22,7 @@ import static org.apache.sling.distribution.queue.DistributionQueueItemState.ERR
import static org.apache.sling.distribution.queue.DistributionQueueItemState.QUEUED;
import java.util.Calendar;
-import java.util.function.Function;
+import java.util.function.ToIntFunction;
import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
@@ -31,26 +31,26 @@ import org.apache.sling.distribution.queue.DistributionQueueItemState;
import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
public class QueueEntryFactory {
-
- private final String queueName;
- private final Function<DistributionQueueItem, Integer> attemptsCallback;
- public QueueEntryFactory(String queueName, Function<DistributionQueueItem, Integer> attemptsCallback) {
- this.queueName = queueName;
- this.attemptsCallback = attemptsCallback;
- }
-
+ private final String queueName;
+ private final ToIntFunction<DistributionQueueItem> attemptsCallback;
+
+ public QueueEntryFactory(String queueName, ToIntFunction<DistributionQueueItem> attemptsCallback) {
+ this.queueName = queueName;
+ this.attemptsCallback = attemptsCallback;
+ }
+
public DistributionQueueEntry create(DistributionQueueItem queueItem) {
- if (queueItem == null) {
- return null;
- }
+ if (queueItem == null) {
+ return null;
+ }
String entryId = EntryUtil.entryId(queueItem);
DistributionQueueItemStatus itemStatus = buildQueueItemStatus(queueItem);
return new DistributionQueueEntry(entryId, queueItem, itemStatus);
}
private DistributionQueueItemStatus buildQueueItemStatus(DistributionQueueItem queueItem) {
- Integer attempts = attemptsCallback.apply(queueItem);
+ int attempts = attemptsCallback.applyAsInt(queueItem);
DistributionQueueItemState state = (attempts > 0) ? ERROR : QUEUED;
return new DistributionQueueItemStatus(itemCalendar(queueItem), state, attempts, queueName);
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java
index 194439b..a60b9c3 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java
@@ -79,7 +79,7 @@ public class RangePoller {
private void handlePackage(MessageInfo info, Messages.PackageMessage message) {
long offset = info.getOffset();
- LOG.debug(String.format("Reading offset %s", offset));
+ LOG.debug("Reading offset {}", offset);
if (offset < maxOffset) {
messages.add(new FullMessage<>(info, message));
} else {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java
index 6659a62..5d34e6d 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java
@@ -25,6 +25,7 @@ import java.util.Set;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
@@ -46,6 +47,8 @@ import static org.apache.sling.distribution.queue.DistributionQueueType.ORDERED;
@ParametersAreNonnullByDefault
public class SubQueue implements DistributionQueue {
+ private static final String UNSUPPORTED_CLEAR_OPERATION = "Unsupported clear operation";
+
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(SubQueue.class);
@@ -58,6 +61,7 @@ public class SubQueue implements DistributionQueue {
private final QueueEntryFactory entryFactory;
public SubQueue(String queueName,
+ @Nullable
DistributionQueueItem headItem,
PackageRetries packageRetries) {
this.headItem = headItem;
@@ -104,19 +108,19 @@ public class SubQueue implements DistributionQueue {
@Override
public DistributionQueueEntry remove(String entryId) {
- throw new UnsupportedOperationException("Unsupported clear operation");
+ throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
}
@Nonnull
@Override
public Iterable<DistributionQueueEntry> remove(Set<String> entryIds) {
- throw new UnsupportedOperationException("Unsupported clear operation");
+ throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
}
@Nonnull
@Override
public Iterable<DistributionQueueEntry> clear(int limit) {
- throw new UnsupportedOperationException("Unsupported clear operation");
+ throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
}
@Nonnull
@@ -152,7 +156,7 @@ public class SubQueue implements DistributionQueue {
return false;
}
- private Integer attempts(DistributionQueueItem queueItem) {
+ private int attempts(DistributionQueueItem queueItem) {
String entryId = EntryUtil.entryId(queueItem);
return packageRetries.get(entryId);
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/AgentState.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/AgentState.java
index 0e00ba9..858e8dc 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/AgentState.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/AgentState.java
@@ -35,9 +35,12 @@ import static org.apache.sling.distribution.agent.DistributionAgentState.RUNNING
@ParametersAreNonnullByDefault
public class AgentState {
+
+ private AgentState() {
+ }
public static DistributionAgentState getState(DistributionAgent agent) {
- boolean empty = ! queueStatuses(agent).anyMatch(AgentState::queueNotEmpty);
+ boolean empty = queueStatuses(agent).noneMatch(AgentState::queueNotEmpty);
if (empty) {
return IDLE;
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
index 16504e9..f18c04c 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
@@ -301,7 +301,7 @@ public class DistributionMetricsService {
}
public <T> GaugeService<T> createGauge(String name, String description, Supplier<T> supplier) {
- return new GaugeService<T>(name, description, supplier);
+ return new GaugeService<>(name, description, supplier);
}
private String getMetricName(String component, String name) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/Topics.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/Topics.java
index 870813b..1f4d44a 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/Topics.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/Topics.java
@@ -60,8 +60,8 @@ public class Topics {
this.statusTopic = config.statusTopic();
this.commandTopic = config.commandTopic();
this.eventTopic = config.eventTopic();
- LOG.info(String.format("Topics service started with packageTopic '%s' discoveryTopic '%s' statusTopic '%s' eventTopic '%s' commandTopic '%s'",
- packageTopic, discoveryTopic, statusTopic, eventTopic, commandTopic));
+ LOG.info("Topics service started with packageTopic '{}' discoveryTopic '{}' statusTopic '{}' eventTopic '{}' commandTopic '{}'",
+ packageTopic, discoveryTopic, statusTopic, eventTopic, commandTopic);
}
public String getPackageTopic() {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
index ffc27bb..7ecd03b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
@@ -79,27 +79,28 @@ class Announcer implements Runnable, Closeable {
public void run() {
LOG.debug("Sending discovery message for agent {}", subAgentName);
try {
+ DiscoveryMessage msg = createDiscoveryMessage();
+ sender.accept(msg);
+ } catch (Exception e) {
+ LOG.info("Failed to send discovery message for agent {}, {}", subAgentName, e.getMessage(), e);
+ }
+ }
- long offset = bookKeeper.loadOffset();
-
- SubscriberConfiguration subscriberConfiguration = SubscriberConfiguration.newBuilder()
- .setEditable(editable)
- .setMaxRetries(maxRetries)
- .build();
- DiscoveryMessage.Builder disMsgBuilder = DiscoveryMessage
- .newBuilder()
- .setSubSlingId(subSlingId)
- .setSubAgentName(subAgentName)
- .setSubscriberConfiguration(subscriberConfiguration);
- for (String pubAgentName : pubAgentNames) {
- disMsgBuilder.addSubscriberState(subscriberState(pubAgentName, offset));
- }
-
- sender.accept(disMsgBuilder.build());
- } catch (Throwable e) {
- String msg = String.format("Failed to send discovery message for agent %s, %s", subAgentName, e.getMessage());
- LOG.info(msg, e);
+ private DiscoveryMessage createDiscoveryMessage() {
+ long offset = bookKeeper.loadOffset();
+ SubscriberConfiguration subscriberConfiguration = SubscriberConfiguration.newBuilder()
+ .setEditable(editable)
+ .setMaxRetries(maxRetries)
+ .build();
+ DiscoveryMessage.Builder disMsgBuilder = DiscoveryMessage
+ .newBuilder()
+ .setSubSlingId(subSlingId)
+ .setSubAgentName(subAgentName)
+ .setSubscriberConfiguration(subscriberConfiguration);
+ for (String pubAgentName : pubAgentNames) {
+ disMsgBuilder.addSubscriberState(subscriberState(pubAgentName, offset));
}
+ return disMsgBuilder.build();
}
private SubscriberState subscriberState(String pubAgentName, long offset) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
index 669d566..d39e57f 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
@@ -77,6 +77,7 @@ import org.slf4j.MDC;
* agent on the leader instance.
*/
public class BookKeeper implements Closeable {
+ private static final String KEY_OFFSET = "offset";
private static final String SUBSERVICE_IMPORTER = "importer";
private static final String SUBSERVICE_BOOKKEEPER = "bookkeeper";
private static final int RETRY_SEND_DELAY = 1000;
@@ -142,8 +143,8 @@ public class BookKeeper implements Closeable {
* once, thanks to the order in which the content updates are applied.
*/
public void importPackage(PackageMessage pkgMsg, long offset, long createdTime) throws DistributionException {
- log.info(format("Importing distribution package %s of type %s at offset %s", pkgMsg.getPkgId(),
- pkgMsg.getReqType(), offset));
+ log.info("Importing distribution package {} of type {} at offset {}",
+ pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
addPackageMDC(pkgMsg);
try (Timer.Context context = distributionMetricsService.getImportedPackageDuration().time();
ResourceResolver importerResolver = getServiceResolver(SUBSERVICE_IMPORTER)) {
@@ -197,7 +198,8 @@ public class BookKeeper implements Closeable {
String pubAgentName = pkgMsg.getPubAgentName();
int retries = packageRetries.get(pubAgentName);
if (errorQueueEnabled && retries >= maxRetries) {
- log.warn(format("Failed to import distribution package %s at offset %s after %s retries, removing the package.", pkgMsg.getPkgId(), offset, retries));
+ log.warn("Failed to import distribution package {} at offset {} after {} retries, removing the package.",
+ pkgMsg.getPkgId(), offset, retries);
removeFailedPackage(pkgMsg, offset);
} else {
packageRetries.increase(pubAgentName);
@@ -207,7 +209,8 @@ public class BookKeeper implements Closeable {
}
public void removePackage(PackageMessage pkgMsg, long offset) throws LoginException, PersistenceException {
- log.info(format("Removing distribution package %s of type %s at offset %s", pkgMsg.getPkgId(), pkgMsg.getReqType(), offset));
+ log.info("Removing distribution package {} of type {} at offset {}",
+ pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
Timer.Context context = distributionMetricsService.getRemovedPackageDuration().time();
try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
if (editable) {
@@ -221,7 +224,7 @@ public class BookKeeper implements Closeable {
}
public void skipPackage(long offset) throws LoginException, PersistenceException {
- log.info(format("Skipping package at offset %s", offset));
+ log.info("Skipping package at offset {}", offset);
try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
storeOffset(resolver, offset);
resolver.commit();
@@ -234,7 +237,7 @@ public class BookKeeper implements Closeable {
boolean sent = status.sent;
for (int retry = 0 ; !sent ; retry++) {
try {
- sendStatusMessage(status, retry);
+ sendStatusMessage(status);
markStatusSent();
sent = true;
} catch (Exception e) {
@@ -245,7 +248,7 @@ public class BookKeeper implements Closeable {
}
}
- private void sendStatusMessage(PackageStatus status, int retry) throws InterruptedException {
+ private void sendStatusMessage(PackageStatus status) {
PackageStatusMessage pkgStatMsg = PackageStatusMessage.newBuilder()
.setSubSlingId(subSlingId)
.setSubAgentName(subAgentName)
@@ -267,7 +270,7 @@ public class BookKeeper implements Closeable {
}
public long loadOffset() {
- return processedOffsets.load("offset", -1L);
+ return processedOffsets.load(KEY_OFFSET, -1L);
}
public int getRetries(String pubAgentName) {
@@ -284,7 +287,8 @@ public class BookKeeper implements Closeable {
}
private void removeFailedPackage(PackageMessage pkgMsg, long offset) throws DistributionException {
- log.info(format("Removing failed distribution package %s of type %s at offset %s", pkgMsg.getPkgId(), pkgMsg.getReqType(), offset));
+ log.info("Removing failed distribution package {} of type {} at offset {}",
+ pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
Timer.Context context = distributionMetricsService.getRemovedFailedPackageDuration().time();
try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
storeStatus(resolver, new PackageStatus(REMOVED_FAILED, offset, pkgMsg.getPubAgentName()));
@@ -303,7 +307,7 @@ public class BookKeeper implements Closeable {
}
private void storeOffset(ResourceResolver resolver, long offset) throws PersistenceException {
- processedOffsets.store(resolver, "offset", offset);
+ processedOffsets.store(resolver, KEY_OFFSET, offset);
}
private ResourceResolver getServiceResolver(String subService) throws LoginException {
@@ -326,7 +330,7 @@ public class BookKeeper implements Closeable {
PackageStatus(ValueMap statusMap) {
Integer statusNum = statusMap.get("statusNumber", Integer.class);
this.status = statusNum !=null ? Status.valueOf(statusNum) : null;
- this.offset = statusMap.get("offset", Long.class);
+ this.offset = statusMap.get(KEY_OFFSET, Long.class);
this.pubAgentName = statusMap.get("pubAgentName", String.class);
this.sent = statusMap.get("sent", true);
}
@@ -335,7 +339,7 @@ public class BookKeeper implements Closeable {
Map<String, Object> s = new HashMap<>();
s.put("pubAgentName", pubAgentName);
s.put("statusNumber", status.getNumber());
- s.put("offset", offset);
+ s.put(KEY_OFFSET, offset);
s.put("sent", sent);
return s;
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
index c05292e..e56828f 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
@@ -18,7 +18,6 @@
*/
package org.apache.sling.distribution.journal.impl.subscriber;
-import static java.lang.String.format;
import static org.apache.sling.distribution.journal.HandlerAdapter.create;
import java.io.Closeable;
@@ -34,11 +33,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CommandPoller implements Closeable {
- private static final Logger LOG = LoggerFactory.getLogger(DistributionSubscriber.class);
+ private static final Logger LOG = LoggerFactory.getLogger(CommandPoller.class);
private final String subSlingId;
private final String subAgentName;
- private final Closeable commandPoller;
+ private final Closeable poller;
private final AtomicLong clearOffset = new AtomicLong(-1);
public CommandPoller(MessagingProvider messagingProvider, Topics topics, String subSlingId, String subAgentName, boolean editable) {
@@ -55,12 +54,12 @@ public class CommandPoller implements Closeable {
* this optimisation will be removed.
*/
- commandPoller = messagingProvider.createPoller(
+ poller = messagingProvider.createPoller(
topics.getCommandTopic(),
Reset.earliest,
create(CommandMessage.class, this::handleCommandMessage));
} else {
- commandPoller = null;
+ poller = null;
}
}
@@ -70,7 +69,7 @@ public class CommandPoller implements Closeable {
private void handleCommandMessage(MessageInfo info, CommandMessage message) {
if (!subSlingId.equals(message.getSubSlingId()) || !subAgentName.equals(message.getSubAgentName())) {
- LOG.debug(format("Skip command for subSlingId %s", message.getSubSlingId()));
+ LOG.debug("Skip command for subSlingId {}", message.getSubSlingId());
return;
}
@@ -92,6 +91,6 @@ public class CommandPoller implements Closeable {
@Override
public void close() {
- IOUtils.closeQuietly(commandPoller);
+ IOUtils.closeQuietly(poller);
}
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 13b59b9..6f6d676 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -259,7 +259,9 @@ public class DistributionSubscriber implements DistributionAgent {
@Override
public DistributionQueue getQueue(@Nonnull String queueName) {
- DistributionQueueItem head = queueItemsBuffer.stream().filter(item -> isIn(queueName, item)).findFirst()
+ DistributionQueueItem head = queueItemsBuffer.stream()
+ .filter(item -> isIn(queueName, item))
+ .findFirst()
.orElse(null);
return new SubQueue(queueName, head, bookKeeper.getPackageRetries());
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
index 7307fdc..ae355c9 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
@@ -63,7 +63,7 @@ public class PackageHandler {
private void installAddPackage(ResourceResolver resolver, PackageMessage pkgMsg)
throws DistributionException {
- LOG.info("Importing paths " + pkgMsg.getPathsList());
+ LOG.info("Importing paths {}",pkgMsg.getPathsList());
InputStream pkgStream = null;
try {
pkgStream = PackageBrowser.pkgStream(resolver, pkgMsg);