You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2019/12/25 18:37:14 UTC

[sling-org-apache-sling-distribution-journal] branch SLING-8932-2 created (now 050f685)

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a change to branch SLING-8932-2
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git.


      at 050f685  SLING-8932 - Fix issues from sonar report

This branch includes the following new commits:

     new 050f685  SLING-8932 - Fix issues from sonar report

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[sling-org-apache-sling-distribution-journal] 01/01: SLING-8932 - Fix issues from sonar report

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch SLING-8932-2
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit 050f685e5c687703dc5d6deff05ea965caba0a88
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Wed Dec 25 19:36:44 2019 +0100

    SLING-8932 - Fix issues from sonar report
---
 .../journal/impl/event/DistributionEvent.java      | 10 +++---
 .../journal/impl/publisher/DistPublisherJMX.java   | 12 ++++---
 .../impl/publisher/PackageMessageFactory.java      |  2 +-
 .../journal/impl/publisher/PackageRepo.java        | 12 ++++---
 .../journal/impl/queue/QueueItemFactory.java       | 21 +++++++-----
 .../journal/impl/queue/impl/EntryUtil.java         |  3 ++
 .../journal/impl/queue/impl/PubErrQueue.java       | 11 +++---
 .../journal/impl/queue/impl/PubQueue.java          | 10 +++---
 .../journal/impl/queue/impl/PubQueueCache.java     |  2 +-
 .../impl/queue/impl/PubQueueProviderImpl.java      | 31 ++++++++---------
 .../journal/impl/queue/impl/QueueEntryFactory.java | 26 +++++++--------
 .../journal/impl/queue/impl/RangePoller.java       |  2 +-
 .../journal/impl/queue/impl/SubQueue.java          | 12 ++++---
 .../journal/impl/shared/AgentState.java            |  5 ++-
 .../impl/shared/DistributionMetricsService.java    |  2 +-
 .../distribution/journal/impl/shared/Topics.java   |  4 +--
 .../journal/impl/subscriber/Announcer.java         | 39 +++++++++++-----------
 .../journal/impl/subscriber/BookKeeper.java        | 28 +++++++++-------
 .../journal/impl/subscriber/CommandPoller.java     | 13 ++++----
 .../impl/subscriber/DistributionSubscriber.java    |  4 ++-
 .../journal/impl/subscriber/PackageHandler.java    |  2 +-
 21 files changed, 136 insertions(+), 115 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/event/DistributionEvent.java b/src/main/java/org/apache/sling/distribution/journal/impl/event/DistributionEvent.java
index 77cf31c..5811357 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/event/DistributionEvent.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/event/DistributionEvent.java
@@ -45,24 +45,26 @@ import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 public class DistributionEvent {
 
     public static final String PACKAGE_ID = "distribution.package.id";
+    private static final String KIND_AGENT = "agent";
+    private static final String KIND_IMPORTER = "importer";
 
     public static Event eventImporterImported(Messages.PackageMessage pkgMsg, String agentName) {
-        return buildEvent(IMPORTER_PACKAGE_IMPORTED, "importer", agentName, pkgMsg);
+        return buildEvent(IMPORTER_PACKAGE_IMPORTED, KIND_IMPORTER, agentName, pkgMsg);
     }
 
     public static Event eventPackageCreated(Messages.PackageMessage pkgMsg, String agentName) {
-        return buildEvent(AGENT_PACKAGE_CREATED, "agent", agentName, pkgMsg);
+        return buildEvent(AGENT_PACKAGE_CREATED, KIND_AGENT, agentName, pkgMsg);
     }
 
     public static Event eventPackageDistributed(DistributionQueueItem queueItem, String agentName) {
-        return buildEvent(AGENT_PACKAGE_DISTRIBUTED, "agent", agentName,
+        return buildEvent(AGENT_PACKAGE_DISTRIBUTED, KIND_AGENT, agentName,
                 queueItem.get(PROPERTY_PACKAGE_TYPE, String.class),
                 queueItem.get(PROPERTY_REQUEST_PATHS, String[].class),
                 queueItem.getPackageId());
     }
 
     public static Event eventPackageQueued(Messages.PackageMessage pkgMsg, String agentName) {
-        return buildEvent(AGENT_PACKAGE_QUEUED, "agent",agentName, pkgMsg);
+        return buildEvent(AGENT_PACKAGE_QUEUED, KIND_AGENT, agentName, pkgMsg);
     }
 
     private static Event buildEvent(String topic, String kind, String agentName, PackageMessage pkgMsg) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMX.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMX.java
index e471ee6..9440ffe 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMX.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistPublisherJMX.java
@@ -38,6 +38,8 @@ import org.apache.sling.distribution.queue.spi.DistributionQueue;
 
 class DistPublisherJMX extends StandardMBean implements DistPublisherJMXMBean {
 
+    private static final String COL_ID = "ID";
+    private static final String COL_OFFSET = "offset";
     private String pubAgentName;
     private DiscoveryService discoveryService;
     private DistributionPublisher distPublisher;
@@ -55,10 +57,10 @@ class DistPublisherJMX extends StandardMBean implements DistPublisherJMXMBean {
     @Override
     public TabularData getOffsetPerSubscriber() throws MBeanException {
         try {
-            String[] itemNames = new String[] {"ID", "offset"};
+            String[] itemNames = new String[] {COL_ID, COL_OFFSET};
             OpenType<?>[] itemTypes = new OpenType[]{SimpleType.STRING, SimpleType.LONG};
             CompositeType rowType = new CompositeType("Offsets", "Offsets by sub agent", itemNames, itemNames, itemTypes);
-            TabularType type = new TabularType("type", "desc", rowType, new String[] {"ID"});
+            TabularType type = new TabularType("type", "desc", rowType, new String[] {COL_ID});
             TabularDataSupport table = new TabularDataSupport(type);
             Set<State> subscribedAgents = discoveryService.getTopologyView().getSubscribedAgents(pubAgentName);
             for (State state : subscribedAgents) {
@@ -77,16 +79,16 @@ class DistPublisherJMX extends StandardMBean implements DistPublisherJMXMBean {
     @Override
     public TabularData getQueue(String queueName) throws MBeanException {
         try {
-            String[] itemNames = new String[] {"ID", "offset"};
+            String[] itemNames = new String[] {COL_ID, COL_OFFSET};
             OpenType<?>[] itemTypes = new OpenType[]{SimpleType.STRING, SimpleType.LONG};
             CompositeType rowType = new CompositeType("Offsets", "Queue Offsets", itemNames, itemNames, itemTypes);
-            TabularType type = new TabularType("type", "desc", rowType, new String[] {"ID"});
+            TabularType type = new TabularType("type", "desc", rowType, new String[] {COL_ID});
             TabularDataSupport table = new TabularDataSupport(type);
             DistributionQueue queue = distPublisher.getQueue(queueName);
             if (queue != null) {
                 for (DistributionQueueEntry item : queue.getEntries(0, 1000)) {
                     CompositeData row = new CompositeDataSupport(rowType, itemNames,
-                            new Object[] { item.getId(), item.getItem().get("offset")});
+                            new Object[] { item.getId(), item.getItem().get(COL_OFFSET)});
                     table.put(row);
                 }
             }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
index fcd3869..2e7d65e 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageMessageFactory.java
@@ -129,7 +129,7 @@ public class PackageMessageFactory {
              * always the case.
              */
 
-            LOG.info(String.format("Package %s too large (%sB) to be sent inline", disPkg.getId(), pkgLength));
+            LOG.info("Package {} too large ({}B) to be sent inline", disPkg.getId(), pkgLength);
             String pkgBinRef = packageRepo.store(resourceResolver, disPkg);
             pkgBuilder.setPkgBinaryRef(pkgBinRef);
         } else {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
index 0ab1e72..e87a960 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageRepo.java
@@ -59,6 +59,8 @@ import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
 @ParametersAreNonnullByDefault
 public class PackageRepo {
 
+    private static final String SLING_FOLDER = "sling:Folder";
+
     @Reference
     private ResourceResolverFactory resolverFactory;
     
@@ -84,7 +86,7 @@ public class PackageRepo {
         try {
             String pkgPath = String.format(PACKAGE_PATH_PATTERN, disPkg.getType(), disPkg.getId());
             Resource pkgResource = ResourceUtil.getOrCreateResource(resolver,
-                    pkgPath, "sling:Folder", "sling:Folder", false);
+                    pkgPath, SLING_FOLDER, SLING_FOLDER, false);
             Node pkgNode = pkgResource.adaptTo(Node.class);
             Node binNode = JcrUtils.getOrAddNode(pkgNode, "bin", NodeType.NT_FILE);
             Node cntNode = JcrUtils.getOrAddNode(binNode, Node.JCR_CONTENT, NodeType.NT_RESOURCE);
@@ -161,14 +163,14 @@ public class PackageRepo {
             throws PersistenceException {
         long offset = pkg.getValueMap().get("offset", -1);
         if (offset < 0) {
-            LOG.info(String.format("keep package %s, setting tail offset %s", pkg.getName(), tailOffset));
+            LOG.info("keep package {}, setting tail offset {}", pkg.getName(), tailOffset);
             pkg.adaptTo(ModifiableValueMap.class).put("offset", tailOffset);
         } else if (offset < headOffset) {
-            LOG.info(String.format("remove package %s, offset smaller than head offset %s < %s", pkg.getName(), offset, headOffset));
+            LOG.info("remove package {}, offset smaller than head offset {} < {}", pkg.getName(), offset, headOffset);
             resolver.delete(pkg);
             return 1;
         } else {
-            LOG.debug(String.format("keep package %s, offset bigger or equal to head offset %s >= %s", pkg.getName(), offset, headOffset));
+            LOG.debug("keep package {}, offset bigger or equal to head offset {} >= {}", pkg.getName(), offset, headOffset);
         }
         return 0;
     }
@@ -176,6 +178,6 @@ public class PackageRepo {
     @Nonnull
     private Resource getRoot(ResourceResolver resolver)
             throws PersistenceException {
-        return ResourceUtil.getOrCreateResource(resolver, PACKAGES_ROOT_PATH, "sling:Folder", "sling:Folder", true);
+        return ResourceUtil.getOrCreateResource(resolver, PACKAGES_ROOT_PATH, SLING_FOLDER, SLING_FOLDER, true);
     }
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactory.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactory.java
index 5296b62..c9808d8 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactory.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/QueueItemFactory.java
@@ -52,6 +52,9 @@ public final class QueueItemFactory {
     public static final String PACKAGE_MSG = "packageMessage";
 
     private static final String REQUEST_USER_ID = "internal.request.user";
+    
+    private QueueItemFactory() {
+    }
 
     public static DistributionQueueItem fromPackage(FullMessage<PackageMessage> fMessage) {
         return fromPackage(fMessage.getInfo(), fMessage.getMessage(), false);
@@ -85,17 +88,17 @@ public final class QueueItemFactory {
     }
     
     private static DistributionRequestType toDistReqType(ReqType reqType) {
-    	switch (reqType) {
-		case ADD:
-			return DistributionRequestType.ADD;
-		case DELETE:
-			return DistributionRequestType.DELETE;
+        switch (reqType) {
+        case ADD:
+            return DistributionRequestType.ADD;
+        case DELETE:
+            return DistributionRequestType.DELETE;
         case TEST:
             return DistributionRequestType.TEST;
-		default:
-			throw new IllegalArgumentException("Unhandled DistributionRequestType: " + reqType.name());
-		}
-	}
+        default:
+            throw new IllegalArgumentException("Unhandled DistributionRequestType: " + reqType.name());
+        }
+    }
 
 	@Nonnull
     private static String[] toArray(List<String> paths) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java
index 9ef3694..529fc52 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/EntryUtil.java
@@ -22,6 +22,9 @@ import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 
 public final class EntryUtil {
+    
+    private EntryUtil() {
+    }
 
     public static long entryOffset(String entryId) {
         String[] chunks = entryId.split("@");
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
index f6ed1cf..aaa7b1c 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubErrQueue.java
@@ -40,6 +40,7 @@ import static java.util.Objects.requireNonNull;
 @ParametersAreNonnullByDefault
 public class PubErrQueue implements DistributionQueue {
 
+    private static final String UNSUPPORTED_CLEAR_OPERATION = "Unsupported clear operation";
     private static final Logger LOG = LoggerFactory.getLogger(PubErrQueue.class);
 
     private final OffsetQueue<DistributionQueueItem> agentQueue;
@@ -54,7 +55,7 @@ public class PubErrQueue implements DistributionQueue {
         this.queueName = requireNonNull(queueName);
         this.agentQueue = requireNonNull(agentQueue);
         this.errorQueue = requireNonNull(errorQueue);
-        this.entryFactory = new QueueEntryFactory(queueName, (queueItem) -> 0);
+        this.entryFactory = new QueueEntryFactory(queueName, queueItem -> 0);
     }
 
     @Nonnull
@@ -87,7 +88,7 @@ public class PubErrQueue implements DistributionQueue {
             if (queueItem != null) {
                 entries.add(entryFactory.create(queueItem));
             } else {
-                LOG.warn(String.format("queueItem at offset %s not found", refOffset));
+                LOG.warn("queueItem at offset {} not found", refOffset);
             }
         }
         return entries;
@@ -103,19 +104,19 @@ public class PubErrQueue implements DistributionQueue {
 
     @Override
     public DistributionQueueEntry remove(@Nonnull String entryId) {
-        throw new UnsupportedOperationException("Unsupported clear operation");
+        throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
     }
 
     @Nonnull
     @Override
     public Iterable<DistributionQueueEntry> remove(Set<String> entryIds) {
-        throw new UnsupportedOperationException("Unsupported clear operation");
+        throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
     }
 
     @Nonnull
     @Override
     public Iterable<DistributionQueueEntry> clear(int limit) {
-        throw new UnsupportedOperationException("Unsupported clear operation");
+        throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
     }
 
     @Nonnull
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
index b6874c5..93f0dca 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
@@ -103,8 +103,8 @@ public class PubQueue implements DistributionQueue {
 
     @Override
     public DistributionQueueEntry getHead() {
-        DistributionQueueItem headItem = offsetQueue.getHeadItem();
-        return entryFactory.create(headItem);
+        DistributionQueueItem queueItem = offsetQueue.getHeadItem();
+        return entryFactory.create(queueItem);
     }
 
     @Nonnull
@@ -209,9 +209,9 @@ public class PubQueue implements DistributionQueue {
         return capabilities.contains(capability);
     }
 
-	private Integer attempts(DistributionQueueItem queueItem) {
-		return queueItem.equals(headItem) ? retries : 0;
-	}
+    private int attempts(DistributionQueueItem queueItem) {
+        return queueItem.equals(headItem) ? retries : 0;
+    }
 
     private Iterable<DistributionQueueEntry> clear(String tailEntryId) {
         List<DistributionQueueEntry> removed = new ArrayList<>();
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index 438c765..75d71fc 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -209,7 +209,7 @@ public class PubQueueCache {
 
         if (requestedMinOffset < cachedMinOffset) {
 
-            LOG.debug(String.format("Requested min offset %s smaller than cached min offset %s", requestedMinOffset, cachedMinOffset));
+            LOG.debug("Requested min offset {} smaller than cached min offset {}", requestedMinOffset, cachedMinOffset);
 
             // Fetching data from a topic is a costly
             // operation. In most cases, we expect the queues
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
index 22e3228..8b253b7 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
@@ -108,7 +108,8 @@ public class PubQueueProviderImpl implements PubQueueProvider {
     @Override
     public DistributionQueue getQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName, long minOffset, int headRetries, boolean editable) {
         OffsetQueue<DistributionQueueItem> agentQueue = pubQueueCacheService.getOffsetQueue(pubAgentName, minOffset);
-        ClearCallback callback = editable ? editableCallback(subSlingId, subAgentName) : null;
+        ClearCallback editableCallback = offset -> sendClearCommand(subSlingId, subAgentName, minOffset);
+        ClearCallback callback = editable ? editableCallback : null;
         return new PubQueue(queueName, agentQueue.getMinOffsetQueue(minOffset), headRetries, callback);
     }
 
@@ -132,11 +133,7 @@ public class PubQueueProviderImpl implements PubQueueProvider {
     public void handleStatus(MessageInfo info, PackageStatusMessage message) {
         if (message.getStatus() == REMOVED_FAILED) {
             String errorQueueKey = errorQueueKey(message.getPubAgentName(), message.getSubSlingId(), message.getSubAgentName());
-            OffsetQueue<Long> errorQueue = errorQueues.get(errorQueueKey);
-            if (errorQueue == null) {
-                errorQueue = new OffsetQueueImpl<>();
-                errorQueues.put(errorQueueKey, errorQueue);
-            }
+            OffsetQueue<Long> errorQueue = errorQueues.computeIfAbsent(errorQueueKey, key -> new OffsetQueueImpl<>());
             errorQueue.putItem(info.getOffset(), message.getOffset());
         }
     }
@@ -146,18 +143,16 @@ public class PubQueueProviderImpl implements PubQueueProvider {
         return String.format("%s#%s#%s", pubAgentName, subSlingId, subAgentName);
     }
 
-    private ClearCallback editableCallback(String subSlingId, String subAgentName) {
-        return (offset) -> {
-            Messages.ClearCommand clearCommand = Messages.ClearCommand.newBuilder()
-                    .setOffset(offset)
-                    .build();
-            CommandMessage commandMessage = CommandMessage.newBuilder()
-                    .setSubSlingId(subSlingId)
-                    .setSubAgentName(subAgentName)
-                    .setClearCommand(clearCommand)
-                    .build();
-            sender.send(topics.getCommandTopic(), commandMessage);
-        };
+    private void sendClearCommand(String subSlingId, String subAgentName, long offset) {
+        Messages.ClearCommand clearCommand = Messages.ClearCommand.newBuilder()
+                .setOffset(offset)
+                .build();
+        CommandMessage commandMessage = CommandMessage.newBuilder()
+                .setSubSlingId(subSlingId)
+                .setSubAgentName(subAgentName)
+                .setClearCommand(clearCommand)
+                .build();
+        sender.send(topics.getCommandTopic(), commandMessage);
     }
 
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java
index 89f2b2a..47a9f25 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueEntryFactory.java
@@ -22,7 +22,7 @@ import static org.apache.sling.distribution.queue.DistributionQueueItemState.ERR
 import static org.apache.sling.distribution.queue.DistributionQueueItemState.QUEUED;
 
 import java.util.Calendar;
-import java.util.function.Function;
+import java.util.function.ToIntFunction;
 
 import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
@@ -31,26 +31,26 @@ import org.apache.sling.distribution.queue.DistributionQueueItemState;
 import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
 
 public class QueueEntryFactory {
-	
-	private final String queueName;
-	private final Function<DistributionQueueItem, Integer> attemptsCallback;
 
-	public QueueEntryFactory(String queueName, Function<DistributionQueueItem, Integer> attemptsCallback) {
-		this.queueName = queueName;
-		this.attemptsCallback = attemptsCallback;
-	}
-	
+    private final String queueName;
+    private final ToIntFunction<DistributionQueueItem> attemptsCallback;
+
+    public QueueEntryFactory(String queueName, ToIntFunction<DistributionQueueItem> attemptsCallback) {
+        this.queueName = queueName;
+        this.attemptsCallback = attemptsCallback;
+    }
+
     public DistributionQueueEntry create(DistributionQueueItem queueItem) {
-    	if (queueItem == null) {
-    		return null;
-    	}
+        if (queueItem == null) {
+            return null;
+        }
         String entryId = EntryUtil.entryId(queueItem);
         DistributionQueueItemStatus itemStatus = buildQueueItemStatus(queueItem);
         return new DistributionQueueEntry(entryId, queueItem, itemStatus);
     }
 
     private DistributionQueueItemStatus buildQueueItemStatus(DistributionQueueItem queueItem) {
-    	Integer attempts = attemptsCallback.apply(queueItem);
+        int attempts = attemptsCallback.applyAsInt(queueItem);
         DistributionQueueItemState state = (attempts > 0) ? ERROR : QUEUED;
         return new DistributionQueueItemStatus(itemCalendar(queueItem), state, attempts, queueName);
     }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java
index 194439b..a60b9c3 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java
@@ -79,7 +79,7 @@ public class RangePoller {
 
     private void handlePackage(MessageInfo info, Messages.PackageMessage message) {
         long offset = info.getOffset();
-        LOG.debug(String.format("Reading offset %s", offset));
+        LOG.debug("Reading offset {}", offset);
         if (offset < maxOffset) {
             messages.add(new FullMessage<>(info, message));
         } else {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java
index 6659a62..5d34e6d 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/SubQueue.java
@@ -25,6 +25,7 @@ import java.util.Set;
 
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import javax.annotation.ParametersAreNonnullByDefault;
 
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
@@ -46,6 +47,8 @@ import static org.apache.sling.distribution.queue.DistributionQueueType.ORDERED;
 @ParametersAreNonnullByDefault
 public class SubQueue implements DistributionQueue {
 
+    private static final String UNSUPPORTED_CLEAR_OPERATION = "Unsupported clear operation";
+
     @SuppressWarnings("unused")
     private static final Logger LOG = LoggerFactory.getLogger(SubQueue.class);
 
@@ -58,6 +61,7 @@ public class SubQueue implements DistributionQueue {
 	private final  QueueEntryFactory entryFactory;
 
     public SubQueue(String queueName,
+                    @Nullable
                     DistributionQueueItem headItem,
                     PackageRetries packageRetries) {
         this.headItem = headItem;
@@ -104,19 +108,19 @@ public class SubQueue implements DistributionQueue {
 
     @Override
     public DistributionQueueEntry remove(String entryId) {
-        throw new UnsupportedOperationException("Unsupported clear operation");
+        throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
     }
 
     @Nonnull
     @Override
     public Iterable<DistributionQueueEntry> remove(Set<String> entryIds) {
-        throw new UnsupportedOperationException("Unsupported clear operation");
+        throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
     }
 
     @Nonnull
     @Override
     public Iterable<DistributionQueueEntry> clear(int limit) {
-        throw new UnsupportedOperationException("Unsupported clear operation");
+        throw new UnsupportedOperationException(UNSUPPORTED_CLEAR_OPERATION);
     }
 
     @Nonnull
@@ -152,7 +156,7 @@ public class SubQueue implements DistributionQueue {
         return false;
     }
 
-    private Integer attempts(DistributionQueueItem queueItem) {
+    private int attempts(DistributionQueueItem queueItem) {
         String entryId = EntryUtil.entryId(queueItem);
         return packageRetries.get(entryId);
     }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/AgentState.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/AgentState.java
index 0e00ba9..858e8dc 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/AgentState.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/AgentState.java
@@ -35,9 +35,12 @@ import static org.apache.sling.distribution.agent.DistributionAgentState.RUNNING
 
 @ParametersAreNonnullByDefault
 public class AgentState {
+    
+    private AgentState() {
+    }
 
     public static DistributionAgentState getState(DistributionAgent agent) {
-        boolean empty = ! queueStatuses(agent).anyMatch(AgentState::queueNotEmpty);
+        boolean empty = queueStatuses(agent).noneMatch(AgentState::queueNotEmpty);
         if (empty) {
             return IDLE;
         }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
index 16504e9..f18c04c 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
@@ -301,7 +301,7 @@ public class DistributionMetricsService {
     }
     
     public <T> GaugeService<T> createGauge(String name, String description, Supplier<T> supplier) {
-        return new GaugeService<T>(name, description, supplier);
+        return new GaugeService<>(name, description, supplier);
     }
 
     private String getMetricName(String component, String name) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/Topics.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/Topics.java
index 870813b..1f4d44a 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/Topics.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/Topics.java
@@ -60,8 +60,8 @@ public class Topics {
         this.statusTopic = config.statusTopic();
         this.commandTopic = config.commandTopic();
         this.eventTopic = config.eventTopic();
-        LOG.info(String.format("Topics service started with packageTopic '%s' discoveryTopic '%s' statusTopic '%s' eventTopic '%s' commandTopic '%s'",
-                packageTopic, discoveryTopic, statusTopic, eventTopic, commandTopic));
+        LOG.info("Topics service started with packageTopic '{}' discoveryTopic '{}' statusTopic '{}' eventTopic '{}' commandTopic '{}'",
+                packageTopic, discoveryTopic, statusTopic, eventTopic, commandTopic);
     }
     
     public String getPackageTopic() {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
index ffc27bb..7ecd03b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/Announcer.java
@@ -79,27 +79,28 @@ class Announcer implements Runnable, Closeable {
     public void run() {
         LOG.debug("Sending discovery message for agent {}", subAgentName);
         try {
+            DiscoveryMessage msg = createDiscoveryMessage();
+            sender.accept(msg);
+        } catch (Exception e) {
+            LOG.info("Failed to send discovery message for agent {}, {}", subAgentName, e.getMessage(), e);
+        }
+    }
 
-            long offset = bookKeeper.loadOffset();
-
-            SubscriberConfiguration subscriberConfiguration = SubscriberConfiguration.newBuilder()
-                    .setEditable(editable)
-                    .setMaxRetries(maxRetries)
-                    .build();
-            DiscoveryMessage.Builder disMsgBuilder = DiscoveryMessage
-                    .newBuilder()
-                    .setSubSlingId(subSlingId)
-                    .setSubAgentName(subAgentName)
-                    .setSubscriberConfiguration(subscriberConfiguration);
-            for (String pubAgentName : pubAgentNames) {
-                disMsgBuilder.addSubscriberState(subscriberState(pubAgentName, offset));
-            }
-
-            sender.accept(disMsgBuilder.build());
-        } catch (Throwable e) {
-            String msg = String.format("Failed to send discovery message for agent %s, %s", subAgentName, e.getMessage());
-            LOG.info(msg, e);
+    private DiscoveryMessage createDiscoveryMessage() {
+        long offset = bookKeeper.loadOffset();
+        SubscriberConfiguration subscriberConfiguration = SubscriberConfiguration.newBuilder()
+                .setEditable(editable)
+                .setMaxRetries(maxRetries)
+                .build();
+        DiscoveryMessage.Builder disMsgBuilder = DiscoveryMessage
+                .newBuilder()
+                .setSubSlingId(subSlingId)
+                .setSubAgentName(subAgentName)
+                .setSubscriberConfiguration(subscriberConfiguration);
+        for (String pubAgentName : pubAgentNames) {
+            disMsgBuilder.addSubscriberState(subscriberState(pubAgentName, offset));
         }
+        return disMsgBuilder.build();
     }
 
     private SubscriberState subscriberState(String pubAgentName, long offset) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
index 669d566..d39e57f 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
@@ -77,6 +77,7 @@ import org.slf4j.MDC;
  * agent on the leader instance.
  */
 public class BookKeeper implements Closeable {
+    private static final String KEY_OFFSET = "offset";
     private static final String SUBSERVICE_IMPORTER = "importer";
     private static final String SUBSERVICE_BOOKKEEPER = "bookkeeper";
     private static final int RETRY_SEND_DELAY = 1000;
@@ -142,8 +143,8 @@ public class BookKeeper implements Closeable {
      * once, thanks to the order in which the content updates are applied.
      */
     public void importPackage(PackageMessage pkgMsg, long offset, long createdTime) throws DistributionException {
-        log.info(format("Importing distribution package %s of type %s at offset %s", pkgMsg.getPkgId(),
-                pkgMsg.getReqType(), offset));
+        log.info("Importing distribution package {} of type {} at offset {}", 
+                pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
         addPackageMDC(pkgMsg);
         try (Timer.Context context = distributionMetricsService.getImportedPackageDuration().time();
                 ResourceResolver importerResolver = getServiceResolver(SUBSERVICE_IMPORTER)) {
@@ -197,7 +198,8 @@ public class BookKeeper implements Closeable {
         String pubAgentName = pkgMsg.getPubAgentName();
         int retries = packageRetries.get(pubAgentName);
         if (errorQueueEnabled && retries >= maxRetries) {
-            log.warn(format("Failed to import distribution package %s at offset %s after %s retries, removing the package.", pkgMsg.getPkgId(), offset, retries));
+            log.warn("Failed to import distribution package {} at offset {} after {} retries, removing the package.", 
+                    pkgMsg.getPkgId(), offset, retries);
             removeFailedPackage(pkgMsg, offset);
         } else {
             packageRetries.increase(pubAgentName);
@@ -207,7 +209,8 @@ public class BookKeeper implements Closeable {
     }
 
     public void removePackage(PackageMessage pkgMsg, long offset) throws LoginException, PersistenceException {
-        log.info(format("Removing distribution package %s of type %s at offset %s", pkgMsg.getPkgId(), pkgMsg.getReqType(), offset));
+        log.info("Removing distribution package {} of type {} at offset {}", 
+                pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
         Timer.Context context = distributionMetricsService.getRemovedPackageDuration().time();
         try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
             if (editable) {
@@ -221,7 +224,7 @@ public class BookKeeper implements Closeable {
     }
     
     public void skipPackage(long offset) throws LoginException, PersistenceException {
-        log.info(format("Skipping package at offset %s", offset));
+        log.info("Skipping package at offset {}", offset);
         try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
             storeOffset(resolver, offset);
             resolver.commit();
@@ -234,7 +237,7 @@ public class BookKeeper implements Closeable {
             boolean sent = status.sent;
             for (int retry = 0 ; !sent ; retry++) {
                 try {
-                    sendStatusMessage(status, retry);
+                    sendStatusMessage(status);
                     markStatusSent();
                     sent = true;
                 } catch (Exception e) {
@@ -245,7 +248,7 @@ public class BookKeeper implements Closeable {
         }
     }
     
-    private void sendStatusMessage(PackageStatus status, int retry) throws InterruptedException {
+    private void sendStatusMessage(PackageStatus status) {
         PackageStatusMessage pkgStatMsg = PackageStatusMessage.newBuilder()
                 .setSubSlingId(subSlingId)
                 .setSubAgentName(subAgentName)
@@ -267,7 +270,7 @@ public class BookKeeper implements Closeable {
     }
     
     public long loadOffset() {
-        return  processedOffsets.load("offset", -1L);
+        return  processedOffsets.load(KEY_OFFSET, -1L);
     }
 
     public int getRetries(String pubAgentName) {
@@ -284,7 +287,8 @@ public class BookKeeper implements Closeable {
     }
     
     private void removeFailedPackage(PackageMessage pkgMsg, long offset) throws DistributionException {
-        log.info(format("Removing failed distribution package %s of type %s at offset %s", pkgMsg.getPkgId(), pkgMsg.getReqType(), offset));
+        log.info("Removing failed distribution package {} of type {} at offset {}", 
+                pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
         Timer.Context context = distributionMetricsService.getRemovedFailedPackageDuration().time();
         try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
             storeStatus(resolver, new PackageStatus(REMOVED_FAILED, offset, pkgMsg.getPubAgentName()));
@@ -303,7 +307,7 @@ public class BookKeeper implements Closeable {
     }
 
     private void storeOffset(ResourceResolver resolver, long offset) throws PersistenceException {
-        processedOffsets.store(resolver, "offset", offset);
+        processedOffsets.store(resolver, KEY_OFFSET, offset);
     }
 
     private ResourceResolver getServiceResolver(String subService) throws LoginException {
@@ -326,7 +330,7 @@ public class BookKeeper implements Closeable {
         PackageStatus(ValueMap statusMap) {
             Integer statusNum = statusMap.get("statusNumber", Integer.class);
             this.status = statusNum !=null ? Status.valueOf(statusNum) : null;
-            this.offset = statusMap.get("offset", Long.class);
+            this.offset = statusMap.get(KEY_OFFSET, Long.class);
             this.pubAgentName = statusMap.get("pubAgentName", String.class);
             this.sent = statusMap.get("sent", true);
         }
@@ -335,7 +339,7 @@ public class BookKeeper implements Closeable {
             Map<String, Object> s = new HashMap<>();
             s.put("pubAgentName", pubAgentName);
             s.put("statusNumber", status.getNumber());
-            s.put("offset", offset);
+            s.put(KEY_OFFSET, offset);
             s.put("sent", sent);
             return s;
         }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
index c05292e..e56828f 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
@@ -18,7 +18,6 @@
  */
 package org.apache.sling.distribution.journal.impl.subscriber;
 
-import static java.lang.String.format;
 import static org.apache.sling.distribution.journal.HandlerAdapter.create;
 
 import java.io.Closeable;
@@ -34,11 +33,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class CommandPoller implements Closeable {
-    private static final Logger LOG = LoggerFactory.getLogger(DistributionSubscriber.class);
+    private static final Logger LOG = LoggerFactory.getLogger(CommandPoller.class);
 
     private final String subSlingId;
     private final String subAgentName;
-    private final Closeable commandPoller;
+    private final Closeable poller;
     private final AtomicLong clearOffset = new AtomicLong(-1);
 
     public CommandPoller(MessagingProvider messagingProvider, Topics topics, String subSlingId, String subAgentName, boolean editable) {
@@ -55,12 +54,12 @@ public class CommandPoller implements Closeable {
              * this optimisation will be removed.
              */
 
-            commandPoller = messagingProvider.createPoller(
+            poller = messagingProvider.createPoller(
                     topics.getCommandTopic(),
                     Reset.earliest,
                     create(CommandMessage.class, this::handleCommandMessage));
         } else {
-            commandPoller = null;
+            poller = null;
         }
     }
     
@@ -70,7 +69,7 @@ public class CommandPoller implements Closeable {
 
     private void handleCommandMessage(MessageInfo info, CommandMessage message) {
         if (!subSlingId.equals(message.getSubSlingId()) || !subAgentName.equals(message.getSubAgentName())) {
-            LOG.debug(format("Skip command for subSlingId %s", message.getSubSlingId()));
+            LOG.debug("Skip command for subSlingId {}", message.getSubSlingId());
             return;
         }
 
@@ -92,6 +91,6 @@ public class CommandPoller implements Closeable {
 
     @Override
     public void close() {
-        IOUtils.closeQuietly(commandPoller);
+        IOUtils.closeQuietly(poller);
     }
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 13b59b9..6f6d676 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -259,7 +259,9 @@ public class DistributionSubscriber implements DistributionAgent {
 
     @Override
     public DistributionQueue getQueue(@Nonnull String queueName) {
-        DistributionQueueItem head = queueItemsBuffer.stream().filter(item -> isIn(queueName, item)).findFirst()
+        DistributionQueueItem head = queueItemsBuffer.stream()
+                .filter(item -> isIn(queueName, item))
+                .findFirst()
                 .orElse(null);
         return new SubQueue(queueName, head, bookKeeper.getPackageRetries());
     }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
index 7307fdc..ae355c9 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PackageHandler.java
@@ -63,7 +63,7 @@ public class PackageHandler {
 
     private void installAddPackage(ResourceResolver resolver, PackageMessage pkgMsg)
             throws DistributionException {
-        LOG.info("Importing paths " + pkgMsg.getPathsList());
+        LOG.info("Importing paths {}",pkgMsg.getPathsList());
         InputStream pkgStream = null;
         try {
             pkgStream = PackageBrowser.pkgStream(resolver, pkgMsg);