You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2019/12/25 18:50:22 UTC

[sling-org-apache-sling-distribution-journal] branch master updated: SLING-8932 - Fix issues from sonar report (#21)

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new b5b51f0  SLING-8932 - Fix issues from sonar report (#21)
b5b51f0 is described below

commit b5b51f02a4a6cd0c75c41e517edcaf75c6d1566d
Author: Christian Schneider <ch...@die-schneider.net>
AuthorDate: Wed Dec 25 19:50:14 2019 +0100

    SLING-8932 - Fix issues from sonar report (#21)
---
 .../journal/impl/event/DistributionEvent.java      | 10 +++---
 .../journal/impl/publisher/DistPublisherJMX.java   | 12 ++++---
 .../impl/publisher/PackageMessageFactory.java      |  2 +-
 .../journal/impl/publisher/PackageRepo.java        | 12 ++++---
 .../journal/impl/queue/QueueItemFactory.java       | 21 +++++++-----
 .../journal/impl/queue/impl/EntryUtil.java         |  3 ++
 .../journal/impl/queue/impl/PubErrQueue.java       | 11 +++---
 .../journal/impl/queue/impl/PubQueue.java          | 10 +++---
 .../journal/impl/queue/impl/PubQueueCache.java     |  2 +-
 .../impl/queue/impl/PubQueueProviderImpl.java      | 31 ++++++++---------
 .../journal/impl/queue/impl/QueueEntryFactory.java | 26 +++++++--------
 .../journal/impl/queue/impl/RangePoller.java       |  2 +-
 .../journal/impl/queue/impl/SubQueue.java          | 12 ++++---
 .../journal/impl/shared/AgentState.java            |  5 ++-
 .../impl/shared/DistributionMetricsService.java    |  2 +-
 .../distribution/journal/impl/shared/Topics.java   |  4 +--
 .../journal/impl/subscriber/Announcer.java         | 39 +++++++++++-----------
 .../journal/impl/subscriber/BookKeeper.java        | 28 +++++++++-------
 .../journal/impl/subscriber/CommandPoller.java     | 13 ++++----
 .../impl/subscriber/DistributionSubscriber.java    |  4 ++-
 .../journal/impl/subscriber/PackageHandler.java    |  2 +-
 21 files changed, 136 insertions(+), 115 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/event/DistributionEvent.java b/src/main/java/org/apache/sling/distribution/journal/impl/event/DistributionEvent.java
index 77cf31c..5811357 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/event/DistributionEvent.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/event/DistributionEvent.java
@@ -45,24 +45,26 @@ import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 public class DistributionEvent {
 
     public static final String PACKAGE_ID = "distribution.package.id";
+    private static final String KIND_AGENT = "agent";
+    private static final String KIND_IMPORTER = "importer";
 
     public static Event eventImporterImported(Messages.PackageMessage pkgMsg, String agentName) {
-        return buildEvent(IMPORTER_PACKAGE_IMPORTED, "importer", agentName, pkgMsg);
+        return buildEvent(IMPORTER_PACKAGE_IMPORTED, KIND_IMPORTER, agentName, pkgMsg);
     }
 
     public static Event eventPackageCreated(Messages.PackageMessage pkgMsg, String agentName) {
-        return buildEvent(AGENT_PACKAGE_CREATED, "agent", agentName, pkgMsg);
+        return buildEvent(AGENT_PACKAGE_CREATED, KIND_AGENT, agentName, pkgMsg);
     }
 
     public static Event eventPackageDistributed(DistributionQueueItem queueItem, String agentName) {
-        return buildEvent(AGENT_PACKAGE_DISTRIBUTED, "agent", agentName,
+        return buildEvent(AGENT_PACKAGE_DISTRIBUTED, KIND_AGENT, agentName,
                 queueItem.get(PROPERTY_PACKAGE_TYPE, String.class),
                 queueItem.get(PROPERTY_REQUEST_PATHS, String[].class),
                 queueItem.getPackageId());
     }
 
     public static Event eventPackageQueued(Messages.PackageMessage pkgMsg, String agentName) {
-        return buildEvent(AGENT_PACKAGE_QUEUED, "agent",agentName, pkgMsg);
+        return buildEvent(AGENT_PACKAGE_QUEUED, KIND_AGENT, agentName, pkgMsg);
     }
 
     private static Event buildEvent(String topic, String kind, String agentName, PackageMessage pkgMsg) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMX.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMX.java
index e471ee6..9440ffe 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMX.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMX.java
@@ -38,6 +38,8 @@ import org.apache.sling.distribution.queue.spi.DistributionQueue;
 
 class DistPublisherJMX extends StandardMBean implements DistPublisherJMXMBean {
 
+    private static final String COL_ID = "ID";
+    private static final String COL_OFFSET = "offset";
     private String pubAgentName;
     private DiscoveryService discoveryService;
     private DistributionPublisher distPublisher;
@@ -55,10 +57,10 @@ class DistPublisherJMX extends StandardMBean implements DistPublisherJMXMBean {
     @Override
     public TabularData getOffsetPerSubscriber() throws MBeanException {
         try {
-            String[] itemNames = new String[] {"ID", "offset"};
+            String[] itemNames = new String[] {COL_ID, COL_OFFSET};
             OpenType<?>[] itemTypes = new OpenType[]{SimpleType.STRING, SimpleType.LONG};
             CompositeType rowType = new CompositeType("Offsets", "Offsets by sub agent", itemNames, itemNames, itemTypes);
-            TabularType type = new TabularType("type", "desc", rowType, new String[] {"ID"});
+            TabularType type = new TabularType("type", "desc", rowType, new String[] {COL_ID});
             TabularDataSupport table = new TabularDataSupport(type);
             Set<State> subscribedAgents = discoveryService.getTopologyView().getSubscribedAgents(pubAgentName);
             for (State state : subscribedAgents) {
@@ -77,16 +79,16 @@ class DistPublisherJMX extends StandardMBean implements DistPublisherJMXMBean {
     @Override
     public TabularData getQueue(String queueName) throws MBeanException {
         try {
-            String[] itemNames = new String[] {"ID", "offset"};
+            String[] itemNames = new String[] {COL_ID, COL_OFFSET};
             OpenType<?>[] itemTypes = new OpenType[]{SimpleType.STRING, SimpleType.LONG};
             CompositeType rowType = new CompositeType("Offsets", "Queue Offsets", itemNames, itemNames, itemTypes);
-            TabularType type = new TabularType("type", "desc", rowType, new String[] {"ID"});
+            TabularType type = new TabularType("type", "desc", rowType, new String[] {COL_ID});
             TabularDataSupport table = new TabularDataSupport(type);
             DistributionQueue queue = distPublisher.getQueue(queueName);
             if (queue != null) {
                 for (DistributionQueueEntry item : queue.getEntries(0, 1000)) {
                     CompositeData row = new CompositeDataSupport(rowType, itemNames,
-                            new Object[] { item.getId(), item.getItem().get("offset")});
+                            new Object[] { item.getId(), item.getItem().get(COL_OFFSET)});
                     table.put(row);
                 }
             }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
index fcd3869..2e7d65e 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
@@ -129,7 +129,7 @@ public class PackageMessageFactory {
              * always the case.
              */
 
-            LOG.info(String.format("Package %s too large (%sB) to be sent inline", disPkg.getId(), pkgLength));
+            LOG.info("Package {} too large ({}B) to be sent inline", disPkg.getId(), pkgLength);
             String pkgBinRef = packageRepo.store(resourceResolver, disPkg);
             pkgBuilder.setPkgBinaryRef(pkgBinRef);
         } else {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
index 0ab1e72..e87a960 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
@@ -59,6 +59,8 @@ import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
 @ParametersAreNonnullByDefault
 public class PackageRepo {
 
+    private static final String SLING_FOLDER = "sling:Folder";
+
     @Reference
     private ResourceResolverFactory resolverFactory;
     
@@ -84,7 +86,7 @@ public class PackageRepo {
         try {
             String pkgPath = String.format(PACKAGE_PATH_PATTERN, disPkg.getType(), disPkg.getId());
             Resource pkgResource = ResourceUtil.getOrCreateResource(resolver,
-                    pkgPath, "sling:Folder", "sling:Folder", false);
+                    pkgPath, SLING_FOLDER, SLING_FOLDER, false);
             Node pkgNode = pkgResource.adaptTo(Node.class);
             Node binNode = JcrUtils.getOrAddNode(pkgNode, "bin", NodeType.NT_FILE);
             Node cntNode = JcrUtils.getOrAddNode(binNode, Node.JCR_CONTENT, NodeType.NT_RESOURCE);
@@ -161,14 +163,14 @@ public class PackageRepo {
             throws PersistenceException {
         long offset = pkg.getValueMap().get("offset", -1);
         if (offset < 0) {
-            LOG.info(String.format("keep package %s, setting tail offset %s", pkg.getName(), tailOffset));
+            LOG.info("keep package {}, setting tail offset {}", pkg.getName(), tailOffset);
             pkg.adaptTo(ModifiableValueMap.class).put("offset", tailOffset);
         } else if (offset < headOffset) {
-            LOG.info(String.format("remove package %s, offset smaller than head offset %s < %s", pkg.getName(), offset, headOffset));
+            LOG.info("remove package {}, offset smaller than head offset {} < {}", pkg.getName(), offset, headOffset);
             resolver.delete(pkg);
             return 1;
         } else {
-            LOG.debug(String.format("keep package %s, offset bigger or equal to head offset %s >= %s", pkg.getName(), offset, headOffset));
+            LOG.debug("keep package {}, offset bigger or equal to head offset {} >= {}", pkg.getName(), offset, headOffset);
         }
         return 0;
     }
@@ -176,6 +178,6 @@ public class PackageRepo {
     @Nonnull
     private Resource getRoot(ResourceResolver resolver)
             throws PersistenceException {
-        return ResourceUtil.getOrCreateResource(resolver, PACKAGES_ROOT_PATH, "sling:Folder", "sling:Folder", true);
+        return ResourceUtil.getOrCreateResource(resolver, PACKAGES_ROOT_PATH, SLING_FOLDER, SLING_FOLDER, true);
     }
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactory.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactory.java
index 5296b62..c9808d8 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactory.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactory.java
@@ -52,6 +52,9 @@ public final class QueueItemFactory {
     public static final String PACKAGE_MSG = "packageMessage";
 
     private static final String REQUEST_USER_ID = "internal.request.user";
+    
+    private QueueItemFactory() {
+    }
 
     public static DistributionQueueItem fromPackage(FullMessage<PackageMessage> fMessage) {
         return fromPackage(fMessage.getInfo(), fMessage.getMessage(), false);
@@ -85,17 +88,17 @@ public final class QueueItemFactory {
     }
     
     private static DistributionRequestType toDistReqType(ReqType reqType) {
-    	switch (reqType) {
-		case ADD:
-			return DistributionRequestType.ADD;
-		case DELETE:
-			return DistributionRequestType.DELETE;
+        switch (reqType) {
+        case ADD:
+            return DistributionRequestType.ADD;
+        case DELETE:
+            return DistributionRequestType.DELETE;
         case TEST:
             return DistributionRequestType.TEST;
-		default:
-			throw new IllegalArgumentException("Unhandled DistributionRequestType: " + reqType.name());
-		}
-	}
+        default:
+            throw new IllegalArgumentException("Unhandled DistributionRequestType: " + reqType.name());
+        }
+    }
 
 	@Nonnull
     private static String[] toArray(List<String> paths) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java
index 9ef3694..529fc52 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java
@@ -22,6 +22,9 @@ import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 
 public final class EntryUtil {
+    
+    private EntryUtil() {
+    }
 
     public static long entryOffset(String entryId) {
         String[] chunks = entryId.split("@");
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
index f6ed1cf..aaa7b1c 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
@@ -40,6 +40,7 @@ import static java.util.Objects.requireNonNull;
 @ParametersAreNonnullByDefault
 public class PubErrQueue implements DistributionQueue {
 
+    private static final String UNSUPPORTED_CLEAR_OPERATION = "Unsupported clear operation";
     private static final Logger LOG = LoggerFactory.getLogger(PubErrQueue.class);
 
     private final OffsetQueue<DistributionQueueItem> agentQueue;
@@ -54,7 +55,7 @@ public class PubErrQueue implements DistributionQueue {
         this.queueName = requireNonNull(queueName);
         this.agentQueue = requireNonNull(agentQueue);
         this.errorQueue = requireNonNull(errorQueue);
-        this.entryFactory = new QueueEntryFactory(queueName, (queueItem) -> 0);
+        this.entryFactory = new QueueEntryFactory(queueName, queueItem -> 0);
     }
 
     @Nonnull
@@ -87,7 +88,7 @@ public class PubErrQueue implements DistributionQueue {
             if (queueItem != null) {
                 entries.add(entryFactory.create(queueItem));
             } else {
-                LOG.warn(String.format("queueItem at offset %s not found", refOffset));
+                LOG.warn("queueItem at offset {} not found", refOffset);
             }
         }
         return entries;
@@ -103,19 +104,19 @@ public class PubErrQueue implements DistributionQueue {
 
     @Override
     public DistributionQueueEntry remove(@Nonnull String entryId) {
-        throw new UnsupportedOperationException("Unsupported clear operation");
+        throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
     }
 
     @Nonnull
     @Override
     public Iterable<DistributionQueueEntry> remove(Set<String> entryIds) {
-        throw new UnsupportedOperationException("Unsupported clear operation");
+        throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
     }
 
     @Nonnull
     @Override
     public Iterable<DistributionQueueEntry> clear(int limit) {
-        throw new UnsupportedOperationException("Unsupported clear operation");
+        throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
     }
 
     @Nonnull
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
index b6874c5..93f0dca 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
@@ -103,8 +103,8 @@ public class PubQueue implements DistributionQueue {
 
     @Override
     public DistributionQueueEntry getHead() {
-        DistributionQueueItem headItem = offsetQueue.getHeadItem();
-        return entryFactory.create(headItem);
+        DistributionQueueItem queueItem = offsetQueue.getHeadItem();
+        return entryFactory.create(queueItem);
     }
 
     @Nonnull
@@ -209,9 +209,9 @@ public class PubQueue implements DistributionQueue {
         return capabilities.contains(capability);
     }
 
-	private Integer attempts(DistributionQueueItem queueItem) {
-		return queueItem.equals(headItem) ? retries : 0;
-	}
+    private int attempts(DistributionQueueItem queueItem) {
+        return queueItem.equals(headItem) ? retries : 0;
+    }
 
     private Iterable<DistributionQueueEntry> clear(String tailEntryId) {
         List<DistributionQueueEntry> removed = new ArrayList<>();
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index 438c765..75d71fc 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -209,7 +209,7 @@ public class PubQueueCache {
 
         if (requestedMinOffset < cachedMinOffset) {
 
-            LOG.debug(String.format("Requested min offset %s smaller than cached min offset %s", requestedMinOffset, cachedMinOffset));
+            LOG.debug("Requested min offset {} smaller than cached min offset {}", requestedMinOffset, cachedMinOffset);
 
             // Fetching data from a topic is a costly
             // operation. In most cases, we expect the queues
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
index 22e3228..8b253b7 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
@@ -108,7 +108,8 @@ public class PubQueueProviderImpl implements PubQueueProvider {
     @Override
     public DistributionQueue getQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName, long minOffset, int headRetries, boolean editable) {
         OffsetQueue<DistributionQueueItem> agentQueue = pubQueueCacheService.getOffsetQueue(pubAgentName, minOffset);
-        ClearCallback callback = editable ? editableCallback(subSlingId, subAgentName) : null;
+        ClearCallback editableCallback = offset -> sendClearCommand(subSlingId, subAgentName, minOffset);
+        ClearCallback callback = editable ? editableCallback : null;
         return new PubQueue(queueName, agentQueue.getMinOffsetQueue(minOffset), headRetries, callback);
     }
 
@@ -132,11 +133,7 @@ public class PubQueueProviderImpl implements PubQueueProvider {
     public void handleStatus(MessageInfo info, PackageStatusMessage message) {
         if (message.getStatus() == REMOVED_FAILED) {
             String errorQueueKey = errorQueueKey(message.getPubAgentName(), message.getSubSlingId(), message.getSubAgentName());
-            OffsetQueue<Long> errorQueue = errorQueues.get(errorQueueKey);
-            if (errorQueue == null) {
-                errorQueue = new OffsetQueueImpl<>();
-                errorQueues.put(errorQueueKey, errorQueue);
-            }
+            OffsetQueue<Long> errorQueue = errorQueues.computeIfAbsent(errorQueueKey, key -> new OffsetQueueImpl<>());
             errorQueue.putItem(info.getOffset(), message.getOffset());
         }
     }
@@ -146,18 +143,16 @@ public class PubQueueProviderImpl implements PubQueueProvider {
         return String.format("%s#%s#%s", pubAgentName, subSlingId, subAgentName);
     }
 
-    private ClearCallback editableCallback(String subSlingId, String subAgentName) {
-        return (offset) -> {
-            Messages.ClearCommand clearCommand = Messages.ClearCommand.newBuilder()
-                    .setOffset(offset)
-                    .build();
-            CommandMessage commandMessage = CommandMessage.newBuilder()
-                    .setSubSlingId(subSlingId)
-                    .setSubAgentName(subAgentName)
-                    .setClearCommand(clearCommand)
-                    .build();
-            sender.send(topics.getCommandTopic(), commandMessage);
-        };
+    private void sendClearCommand(String subSlingId, String subAgentName, long offset) {
+        Messages.ClearCommand clearCommand = Messages.ClearCommand.newBuilder()
+                .setOffset(offset)
+                .build();
+        CommandMessage commandMessage = CommandMessage.newBuilder()
+                .setSubSlingId(subSlingId)
+                .setSubAgentName(subAgentName)
+                .setClearCommand(clearCommand)
+                .build();
+        sender.send(topics.getCommandTopic(), commandMessage);
     }
 
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java
index 89f2b2a..47a9f25 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java
@@ -22,7 +22,7 @@ import static org.apache.sling.distribution.queue.DistributionQueueItemState.ERR
 import static org.apache.sling.distribution.queue.DistributionQueueItemState.QUEUED;
 
 import java.util.Calendar;
-import java.util.function.Function;
+import java.util.function.ToIntFunction;
 
 import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
@@ -31,26 +31,26 @@ import org.apache.sling.distribution.queue.DistributionQueueItemState;
 import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
 
 public class QueueEntryFactory {
-	
-	private final String queueName;
-	private final Function<DistributionQueueItem, Integer> attemptsCallback;
 
-	public QueueEntryFactory(String queueName, Function<DistributionQueueItem, Integer> attemptsCallback) {
-		this.queueName = queueName;
-		this.attemptsCallback = attemptsCallback;
-	}
-	
+    private final String queueName;
+    private final ToIntFunction<DistributionQueueItem> attemptsCallback;
+
+    public QueueEntryFactory(String queueName, ToIntFunction<DistributionQueueItem> attemptsCallback) {
+        this.queueName = queueName;
+        this.attemptsCallback = attemptsCallback;
+    }
+
     public DistributionQueueEntry create(DistributionQueueItem queueItem) {
-    	if (queueItem == null) {
-    		return null;
-    	}
+        if (queueItem == null) {
+            return null;
+        }
         String entryId = EntryUtil.entryId(queueItem);
         DistributionQueueItemStatus itemStatus = buildQueueItemStatus(queueItem);
         return new DistributionQueueEntry(entryId, queueItem, itemStatus);
     }
 
     private DistributionQueueItemStatus buildQueueItemStatus(DistributionQueueItem queueItem) {
-    	Integer attempts = attemptsCallback.apply(queueItem);
+        int attempts = attemptsCallback.applyAsInt(queueItem);
         DistributionQueueItemState state = (attempts > 0) ? ERROR : QUEUED;
         return new DistributionQueueItemStatus(itemCalendar(queueItem), state, attempts, queueName);
     }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java
index 194439b..a60b9c3 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java
@@ -79,7 +79,7 @@ public class RangePoller {
 
     private void handlePackage(MessageInfo info, Messages.PackageMessage message) {
         long offset = info.getOffset();
-        LOG.debug(String.format("Reading offset %s", offset));
+        LOG.debug("Reading offset {}", offset);
         if (offset < maxOffset) {
             messages.add(new FullMessage<>(info, message));
         } else {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java
index 6659a62..5d34e6d 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java
@@ -25,6 +25,7 @@ import java.util.Set;
 
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import javax.annotation.ParametersAreNonnullByDefault;
 
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
@@ -46,6 +47,8 @@ import static org.apache.sling.distribution.queue.DistributionQueueType.ORDERED;
 @ParametersAreNonnullByDefault
 public class SubQueue implements DistributionQueue {
 
+    private static final String UNSUPPORTED_CLEAR_OPERATION = "Unsupported clear operation";
+
     @SuppressWarnings("unused")
     private static final Logger LOG = LoggerFactory.getLogger(SubQueue.class);
 
@@ -58,6 +61,7 @@ public class SubQueue implements DistributionQueue {
 	private final  QueueEntryFactory entryFactory;
 
     public SubQueue(String queueName,
+                    @Nullable
                     DistributionQueueItem headItem,
                     PackageRetries packageRetries) {
         this.headItem = headItem;
@@ -104,19 +108,19 @@ public class SubQueue implements DistributionQueue {
 
     @Override
     public DistributionQueueEntry remove(String entryId) {
-        throw new UnsupportedOperationException("Unsupported clear operation");
+        throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
     }
 
     @Nonnull
     @Override
     public Iterable<DistributionQueueEntry> remove(Set<String> entryIds) {
-        throw new UnsupportedOperationException("Unsupported clear operation");
+        throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
     }
 
     @Nonnull
     @Override
     public Iterable<DistributionQueueEntry> clear(int limit) {
-        throw new UnsupportedOperationException("Unsupported clear operation");
+        throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
     }
 
     @Nonnull
@@ -152,7 +156,7 @@ public class SubQueue implements DistributionQueue {
         return false;
     }
 
-    private Integer attempts(DistributionQueueItem queueItem) {
+    private int attempts(DistributionQueueItem queueItem) {
         String entryId = EntryUtil.entryId(queueItem);
         return packageRetries.get(entryId);
     }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/AgentState.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/AgentState.java
index 0e00ba9..858e8dc 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/AgentState.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/AgentState.java
@@ -35,9 +35,12 @@ import static org.apache.sling.distribution.agent.DistributionAgentState.RUNNING
 
 @ParametersAreNonnullByDefault
 public class AgentState {
+    
+    private AgentState() {
+    }
 
     public static DistributionAgentState getState(DistributionAgent agent) {
-        boolean empty = ! queueStatuses(agent).anyMatch(AgentState::queueNotEmpty);
+        boolean empty = queueStatuses(agent).noneMatch(AgentState::queueNotEmpty);
         if (empty) {
             return IDLE;
         }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
index 16504e9..f18c04c 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
@@ -301,7 +301,7 @@ public class DistributionMetricsService {
     }
     
     public <T> GaugeService<T> createGauge(String name, String description, Supplier<T> supplier) {
-        return new GaugeService<T>(name, description, supplier);
+        return new GaugeService<>(name, description, supplier);
     }
 
     private String getMetricName(String component, String name) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/Topics.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/Topics.java
index 870813b..1f4d44a 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/Topics.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/Topics.java
@@ -60,8 +60,8 @@ public class Topics {
         this.statusTopic = config.statusTopic();
         this.commandTopic = config.commandTopic();
         this.eventTopic = config.eventTopic();
-        LOG.info(String.format("Topics service started with packageTopic '%s' discoveryTopic '%s' statusTopic '%s' eventTopic '%s' commandTopic '%s'",
-                packageTopic, discoveryTopic, statusTopic, eventTopic, commandTopic));
+        LOG.info("Topics service started with packageTopic '{}' discoveryTopic '{}' statusTopic '{}' eventTopic '{}' commandTopic '{}'",
+                packageTopic, discoveryTopic, statusTopic, eventTopic, commandTopic);
     }
     
     public String getPackageTopic() {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
index ffc27bb..7ecd03b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
@@ -79,27 +79,28 @@ class Announcer implements Runnable, Closeable {
     public void run() {
         LOG.debug("Sending discovery message for agent {}", subAgentName);
         try {
+            DiscoveryMessage msg = createDiscoveryMessage();
+            sender.accept(msg);
+        } catch (Exception e) {
+            LOG.info("Failed to send discovery message for agent {}, {}", subAgentName, e.getMessage(), e);
+        }
+    }
 
-            long offset = bookKeeper.loadOffset();
-
-            SubscriberConfiguration subscriberConfiguration = SubscriberConfiguration.newBuilder()
-                    .setEditable(editable)
-                    .setMaxRetries(maxRetries)
-                    .build();
-            DiscoveryMessage.Builder disMsgBuilder = DiscoveryMessage
-                    .newBuilder()
-                    .setSubSlingId(subSlingId)
-                    .setSubAgentName(subAgentName)
-                    .setSubscriberConfiguration(subscriberConfiguration);
-            for (String pubAgentName : pubAgentNames) {
-                disMsgBuilder.addSubscriberState(subscriberState(pubAgentName, offset));
-            }
-
-            sender.accept(disMsgBuilder.build());
-        } catch (Throwable e) {
-            String msg = String.format("Failed to send discovery message for agent %s, %s", subAgentName, e.getMessage());
-            LOG.info(msg, e);
+    private DiscoveryMessage createDiscoveryMessage() {
+        long offset = bookKeeper.loadOffset();
+        SubscriberConfiguration subscriberConfiguration = SubscriberConfiguration.newBuilder()
+                .setEditable(editable)
+                .setMaxRetries(maxRetries)
+                .build();
+        DiscoveryMessage.Builder disMsgBuilder = DiscoveryMessage
+                .newBuilder()
+                .setSubSlingId(subSlingId)
+                .setSubAgentName(subAgentName)
+                .setSubscriberConfiguration(subscriberConfiguration);
+        for (String pubAgentName : pubAgentNames) {
+            disMsgBuilder.addSubscriberState(subscriberState(pubAgentName, offset));
         }
+        return disMsgBuilder.build();
     }
 
     private SubscriberState subscriberState(String pubAgentName, long offset) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
index 669d566..d39e57f 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
@@ -77,6 +77,7 @@ import org.slf4j.MDC;
  * agent on the leader instance.
  */
 public class BookKeeper implements Closeable {
+    private static final String KEY_OFFSET = "offset";
     private static final String SUBSERVICE_IMPORTER = "importer";
     private static final String SUBSERVICE_BOOKKEEPER = "bookkeeper";
     private static final int RETRY_SEND_DELAY = 1000;
@@ -142,8 +143,8 @@ public class BookKeeper implements Closeable {
      * once, thanks to the order in which the content updates are applied.
      */
     public void importPackage(PackageMessage pkgMsg, long offset, long createdTime) throws DistributionException {
-        log.info(format("Importing distribution package %s of type %s at offset %s", pkgMsg.getPkgId(),
-                pkgMsg.getReqType(), offset));
+        log.info("Importing distribution package {} of type {} at offset {}", 
+                pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
         addPackageMDC(pkgMsg);
         try (Timer.Context context = distributionMetricsService.getImportedPackageDuration().time();
                 ResourceResolver importerResolver = getServiceResolver(SUBSERVICE_IMPORTER)) {
@@ -197,7 +198,8 @@ public class BookKeeper implements Closeable {
         String pubAgentName = pkgMsg.getPubAgentName();
         int retries = packageRetries.get(pubAgentName);
         if (errorQueueEnabled && retries >= maxRetries) {
-            log.warn(format("Failed to import distribution package %s at offset %s after %s retries, removing the package.", pkgMsg.getPkgId(), offset, retries));
+            log.warn("Failed to import distribution package {} at offset {} after {} retries, removing the package.", 
+                    pkgMsg.getPkgId(), offset, retries);
             removeFailedPackage(pkgMsg, offset);
         } else {
             packageRetries.increase(pubAgentName);
@@ -207,7 +209,8 @@ public class BookKeeper implements Closeable {
     }
 
     public void removePackage(PackageMessage pkgMsg, long offset) throws LoginException, PersistenceException {
-        log.info(format("Removing distribution package %s of type %s at offset %s", pkgMsg.getPkgId(), pkgMsg.getReqType(), offset));
+        log.info("Removing distribution package {} of type {} at offset {}", 
+                pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
         Timer.Context context = distributionMetricsService.getRemovedPackageDuration().time();
         try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
             if (editable) {
@@ -221,7 +224,7 @@ public class BookKeeper implements Closeable {
     }
     
     public void skipPackage(long offset) throws LoginException, PersistenceException {
-        log.info(format("Skipping package at offset %s", offset));
+        log.info("Skipping package at offset {}", offset);
         try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
             storeOffset(resolver, offset);
             resolver.commit();
@@ -234,7 +237,7 @@ public class BookKeeper implements Closeable {
             boolean sent = status.sent;
             for (int retry = 0 ; !sent ; retry++) {
                 try {
-                    sendStatusMessage(status, retry);
+                    sendStatusMessage(status);
                     markStatusSent();
                     sent = true;
                 } catch (Exception e) {
@@ -245,7 +248,7 @@ public class BookKeeper implements Closeable {
         }
     }
     
-    private void sendStatusMessage(PackageStatus status, int retry) throws InterruptedException {
+    private void sendStatusMessage(PackageStatus status) {
         PackageStatusMessage pkgStatMsg = PackageStatusMessage.newBuilder()
                 .setSubSlingId(subSlingId)
                 .setSubAgentName(subAgentName)
@@ -267,7 +270,7 @@ public class BookKeeper implements Closeable {
     }
     
     public long loadOffset() {
-        return  processedOffsets.load("offset", -1L);
+        return  processedOffsets.load(KEY_OFFSET, -1L);
     }
 
     public int getRetries(String pubAgentName) {
@@ -284,7 +287,8 @@ public class BookKeeper implements Closeable {
     }
     
     private void removeFailedPackage(PackageMessage pkgMsg, long offset) throws DistributionException {
-        log.info(format("Removing failed distribution package %s of type %s at offset %s", pkgMsg.getPkgId(), pkgMsg.getReqType(), offset));
+        log.info("Removing failed distribution package {} of type {} at offset {}", 
+                pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
         Timer.Context context = distributionMetricsService.getRemovedFailedPackageDuration().time();
         try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
             storeStatus(resolver, new PackageStatus(REMOVED_FAILED, offset, pkgMsg.getPubAgentName()));
@@ -303,7 +307,7 @@ public class BookKeeper implements Closeable {
     }
 
     private void storeOffset(ResourceResolver resolver, long offset) throws PersistenceException {
-        processedOffsets.store(resolver, "offset", offset);
+        processedOffsets.store(resolver, KEY_OFFSET, offset);
     }
 
     private ResourceResolver getServiceResolver(String subService) throws LoginException {
@@ -326,7 +330,7 @@ public class BookKeeper implements Closeable {
         PackageStatus(ValueMap statusMap) {
             Integer statusNum = statusMap.get("statusNumber", Integer.class);
             this.status = statusNum !=null ? Status.valueOf(statusNum) : null;
-            this.offset = statusMap.get("offset", Long.class);
+            this.offset = statusMap.get(KEY_OFFSET, Long.class);
             this.pubAgentName = statusMap.get("pubAgentName", String.class);
             this.sent = statusMap.get("sent", true);
         }
@@ -335,7 +339,7 @@ public class BookKeeper implements Closeable {
             Map<String, Object> s = new HashMap<>();
             s.put("pubAgentName", pubAgentName);
             s.put("statusNumber", status.getNumber());
-            s.put("offset", offset);
+            s.put(KEY_OFFSET, offset);
             s.put("sent", sent);
             return s;
         }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
index c05292e..e56828f 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
@@ -18,7 +18,6 @@
  */
 package org.apache.sling.distribution.journal.impl.subscriber;
 
-import static java.lang.String.format;
 import static org.apache.sling.distribution.journal.HandlerAdapter.create;
 
 import java.io.Closeable;
@@ -34,11 +33,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class CommandPoller implements Closeable {
-    private static final Logger LOG = LoggerFactory.getLogger(DistributionSubscriber.class);
+    private static final Logger LOG = LoggerFactory.getLogger(CommandPoller.class);
 
     private final String subSlingId;
     private final String subAgentName;
-    private final Closeable commandPoller;
+    private final Closeable poller;
     private final AtomicLong clearOffset = new AtomicLong(-1);
 
     public CommandPoller(MessagingProvider messagingProvider, Topics topics, String subSlingId, String subAgentName, boolean editable) {
@@ -55,12 +54,12 @@ public class CommandPoller implements Closeable {
              * this optimisation will be removed.
              */
 
-            commandPoller = messagingProvider.createPoller(
+            poller = messagingProvider.createPoller(
                     topics.getCommandTopic(),
                     Reset.earliest,
                     create(CommandMessage.class, this::handleCommandMessage));
         } else {
-            commandPoller = null;
+            poller = null;
         }
     }
     
@@ -70,7 +69,7 @@ public class CommandPoller implements Closeable {
 
     private void handleCommandMessage(MessageInfo info, CommandMessage message) {
         if (!subSlingId.equals(message.getSubSlingId()) || !subAgentName.equals(message.getSubAgentName())) {
-            LOG.debug(format("Skip command for subSlingId %s", message.getSubSlingId()));
+            LOG.debug("Skip command for subSlingId {}", message.getSubSlingId());
             return;
         }
 
@@ -92,6 +91,6 @@ public class CommandPoller implements Closeable {
 
     @Override
     public void close() {
-        IOUtils.closeQuietly(commandPoller);
+        IOUtils.closeQuietly(poller);
     }
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 13b59b9..6f6d676 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -259,7 +259,9 @@ public class DistributionSubscriber implements DistributionAgent {
 
     @Override
     public DistributionQueue getQueue(@Nonnull String queueName) {
-        DistributionQueueItem head = queueItemsBuffer.stream().filter(item -> isIn(queueName, item)).findFirst()
+        DistributionQueueItem head = queueItemsBuffer.stream()
+                .filter(item -> isIn(queueName, item))
+                .findFirst()
                 .orElse(null);
         return new SubQueue(queueName, head, bookKeeper.getPackageRetries());
     }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
index 7307fdc..ae355c9 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
@@ -63,7 +63,7 @@ public class PackageHandler {
 
     private void installAddPackage(ResourceResolver resolver, PackageMessage pkgMsg)
             throws DistributionException {
-        LOG.info("Importing paths " + pkgMsg.getPathsList());
+        LOG.info("Importing paths {}",pkgMsg.getPathsList());
         InputStream pkgStream = null;
         try {
             pkgStream = PackageBrowser.pkgStream(resolver, pkgMsg);