You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2019/12/20 21:12:56 UTC

[sling-org-apache-sling-distribution-core] branch master updated: SLING-8945 ActiveResourceQueue doesn't provide cluster-consistent processing-attempts view * refactors the code a bit to contain status recording within queue instances themselves, freeing Queue Providers from the responsibility of maintaining them for the queues they create

This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 78d9c04  SLING-8945 ActiveResourceQueue doesn't provide cluster-consistent processing-attempts view * refactors the code a bit to contain status recording within queue instances themselves, freeing Queue Providers   from the responsibility of maintaining them for the queues they create
     new 537bb57  Merge pull request #31 from actinium15/issue/SLING-8945
78d9c04 is described below

commit 78d9c04ee9075f0b3970f0087cc34ca59482529f
Author: Ashish Chopra <as...@adobe.com>
AuthorDate: Fri Dec 20 23:11:10 2019 +0530

    SLING-8945 ActiveResourceQueue doesn't provide cluster-consistent processing-attempts view
    * refactors the code a bit to contain status recording within queue instances themselves, freeing Queue Providers
      from the responsibility of maintaining them for the queues they create
---
 .../queue/impl/resource/ActiveResourceQueue.java   | 16 ++++++++++++
 .../queue/impl/resource/ResourceQueueProvider.java | 27 +++++++++-----------
 .../queue/impl/resource/ResourceQueueUtils.java    | 11 +++++++-
 .../queue/impl/simple/SimpleDistributionQueue.java | 28 ++++++++++++---------
 .../simple/SimpleDistributionQueueProcessor.java   | 29 +++++++++++-----------
 .../simple/SimpleDistributionQueueProvider.java    | 22 ++++++++--------
 .../agent/impl/SimpleDistributionAgentTest.java    | 11 ++++----
 .../impl/resource/ResourceQueueProcessingTest.java |  2 +-
 .../SimpleDistributionQueueProcessorTest.java      |  8 +++---
 .../impl/simple/SimpleDistributionQueueTest.java   | 25 ++++++++++++-------
 10 files changed, 106 insertions(+), 73 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ActiveResourceQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ActiveResourceQueue.java
index 6a1339c..1921c22 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ActiveResourceQueue.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ActiveResourceQueue.java
@@ -71,4 +71,20 @@ public class ActiveResourceQueue extends ResourceQueue {
             DistributionUtils.safelyLogout(resourceResolver);
         }
     }
+
+    public void recordProcessingAttempt(@NotNull DistributionQueueEntry entry) {
+        ResourceResolver resourceResolver = null;
+        try {
+            resourceResolver = DistributionUtils.loginService(resolverFactory, serviceName);
+            Resource queueRoot = ResourceQueueUtils.getRootResource(resourceResolver, queueRootPath);
+            Resource queueItemResource = ResourceQueueUtils.getResourceById(queueRoot, entry.getId());
+            ResourceQueueUtils.incrementProcessingAttemptForQueueItem(queueItemResource);
+            resourceResolver.commit();
+            log.debug("incremented processing-attempt for queue entry with id: {}", entry.getId());
+        } catch (Exception e) {
+            log.warn("Couldn't increment processing-attempt for queue entry with id: {}", entry.getId());
+        } finally {
+            DistributionUtils.safelyLogout(resourceResolver);
+        }
+    }
 }
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java
index 221501a..f35f2ce 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java
@@ -23,7 +23,7 @@ import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.commons.scheduler.ScheduleOptions;
 import org.apache.sling.commons.scheduler.Scheduler;
 import org.apache.sling.distribution.common.DistributionException;
-import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
+import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueType;
 import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
 import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
@@ -40,6 +40,7 @@ import java.util.Dictionary;
 import java.util.Hashtable;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
 
 public class ResourceQueueProvider implements DistributionQueueProvider {
     public static final String TYPE = "resource";
@@ -57,9 +58,6 @@ public class ResourceQueueProvider implements DistributionQueueProvider {
 
     private final Map<String, ResourceQueue> queueMap = new ConcurrentHashMap<>();
 
-    private final Map<String, Map<String, DistributionQueueItemStatus>> statusMap = new ConcurrentHashMap<>();
-
-
     private ServiceRegistration<Runnable> cleanupTask;
 
     public ResourceQueueProvider(BundleContext context, ResourceResolverFactory resolverFactory,
@@ -81,13 +79,11 @@ public class ResourceQueueProvider implements DistributionQueueProvider {
     @NotNull
     @Override
     public DistributionQueue getQueue(@NotNull String queueName) throws DistributionException {
-        String key = getKey(queueName);
-        return queueMap.computeIfAbsent(key, k -> {
-            statusMap.put(key, new ConcurrentHashMap<>());
+        return queueMap.computeIfAbsent(queueName, name -> {
             if (isActive) {
-                return new ActiveResourceQueue(resolverFactory, serviceName, queueName, agentRootPath);
+                return new ActiveResourceQueue(resolverFactory, serviceName, name, agentRootPath);
             } else {
-                return new ResourceQueue(resolverFactory, serviceName, queueName, agentRootPath);
+                return new ResourceQueue(resolverFactory, serviceName, name, agentRootPath);
             }
         });
     }
@@ -111,8 +107,13 @@ public class ResourceQueueProvider implements DistributionQueueProvider {
                         .canRunConcurrently(false)
                         .onSingleInstanceOnly(true)
                         .name(getJobName(queueName));
-                scheduler.schedule(new SimpleDistributionQueueProcessor(getQueue(queueName), queueProcessor,
-                        statusMap.get(getKey(queueName))), options);
+                DistributionQueue queueImpl = getQueue(queueName);
+                Consumer<DistributionQueueEntry> processingAttemptRecorder = null;
+                if (isActive) {
+                    processingAttemptRecorder = ((ActiveResourceQueue)queueImpl)::recordProcessingAttempt;
+                }
+                scheduler.schedule(new SimpleDistributionQueueProcessor(queueImpl, queueProcessor, processingAttemptRecorder),
+                        options);
             }
         } else {
             throw new DistributionException(new UnsupportedOperationException("enable Processing not supported for Passive Queues"));
@@ -150,10 +151,6 @@ public class ResourceQueueProvider implements DistributionQueueProvider {
         cleanupTask = context.registerService(Runnable.class, cleanup, props);
     }
 
-    private String getKey(String queueName) {
-        return agentName + "#" + queueName;
-    }
-
     public void close() {
         if (cleanupTask != null) {
             cleanupTask.unregister();
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueUtils.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueUtils.java
index a54767e..ccd76a2 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueUtils.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueUtils.java
@@ -20,6 +20,7 @@
 package org.apache.sling.distribution.queue.impl.resource;
 
 
+import org.apache.sling.api.resource.ModifiableValueMap;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
@@ -70,6 +71,7 @@ public class ResourceQueueUtils {
     private static final String DISTRIBUTION_PACKAGE_ID = DISTRIBUTION_PACKAGE_PREFIX + "item.id";
     private static final String DISTRIBUTION_PACKAGE_SIZE = DISTRIBUTION_PACKAGE_PREFIX + "package.size";
     private static final String ENTERED_DATE = "entered.date";
+    private static final String PROCESSING_ATTEMPTS = "processing.attempts";
 
 
     private static final AtomicLong itemCounter = new AtomicLong(0);
@@ -145,8 +147,9 @@ public class ResourceQueueUtils {
         ValueMap valueMap = resource.getValueMap();
         DistributionQueueItem queueItem = deserializeItem(valueMap);
         Calendar entered = valueMap.get(ENTERED_DATE, Calendar.getInstance());
+        int attempts = valueMap.get(PROCESSING_ATTEMPTS, 0);
         DistributionQueueItemStatus queueItemStatus = new DistributionQueueItemStatus(entered,
-                DistributionQueueItemState.QUEUED, 0, queueName);
+                DistributionQueueItemState.QUEUED, attempts, queueName);
 
         String entryId = getIdFromPath(queueRoot.getPath(), resource.getPath());
 
@@ -420,4 +423,10 @@ public class ResourceQueueUtils {
         return itemId.replace(ID_START, "").replace("--", "/");
     }
 
+    public static void incrementProcessingAttemptForQueueItem(Resource queueItemResource) {
+        ValueMap vm = queueItemResource.adaptTo(ModifiableValueMap.class);
+        int attempts = vm.get(PROCESSING_ATTEMPTS, 0);
+        vm.put(PROCESSING_ATTEMPTS, attempts + 1);
+    }
+
 }
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
index 6b4a2c1..85a08b1 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
+import java.util.WeakHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
@@ -67,13 +68,13 @@ public class SimpleDistributionQueue implements DistributionQueue {
 
     private final Queue<DistributionQueueItem> queue;
 
-    private final Map<String, DistributionQueueItemStatus> statusMap;
+    private final Map<DistributionQueueItem, DistributionQueueItemStatus> statusMap;
 
-    public SimpleDistributionQueue(String agentName, String name, Map<String, DistributionQueueItemStatus> statusMap) {
+    public SimpleDistributionQueue(String agentName, String name) {
         log.debug("starting a simple queue {} for agent {}", name, agentName);
         this.name = name;
         this.queue = new LinkedBlockingQueue<DistributionQueueItem>();
-        this.statusMap = statusMap;
+        this.statusMap = new WeakHashMap<DistributionQueueItem, DistributionQueueItemStatus>(10);
     }
 
     @NotNull
@@ -84,19 +85,17 @@ public class SimpleDistributionQueue implements DistributionQueue {
     public DistributionQueueEntry add(@NotNull DistributionQueueItem item) {
         DistributionQueueItemState itemState = DistributionQueueItemState.ERROR;
         boolean result = false;
-        String entryId = item.getPackageId();
         try {
             result = queue.offer(item);
             itemState = DistributionQueueItemState.QUEUED;
         } catch (Exception e) {
             log.error("cannot add an item to the queue", e);
         } finally {
-            statusMap.put(entryId, new DistributionQueueItemStatus(Calendar.getInstance(), itemState, 0, name));
+            statusMap.put(item, new DistributionQueueItemStatus(Calendar.getInstance(), itemState, 0, name));
         }
 
         if (result) {
-            
-            return new DistributionQueueEntry(item.getPackageId(), item, statusMap.get(entryId));
+            return new DistributionQueueEntry(item.getPackageId(), item, statusMap.get(item));
         }
 
         return null;
@@ -107,7 +106,7 @@ public class SimpleDistributionQueue implements DistributionQueue {
     public DistributionQueueEntry getHead() {
         DistributionQueueItem element = queue.peek();
         if (element != null) {
-            DistributionQueueItemStatus itemState = statusMap.get(element.getPackageId());
+            DistributionQueueItemStatus itemState = statusMap.get(element);
 
             return new DistributionQueueEntry(element.getPackageId(), element, itemState);
         }
@@ -117,7 +116,7 @@ public class SimpleDistributionQueue implements DistributionQueue {
     @NotNull
     private DistributionQueueState getState() {
         DistributionQueueItem firstItem = queue.peek();
-        DistributionQueueItemStatus firstItemStatus = firstItem != null ? statusMap.get(firstItem.getPackageId()) : null;
+        DistributionQueueItemStatus firstItemStatus = firstItem != null ? statusMap.get(firstItem) : null;
         return DistributionQueueUtils.calculateState(firstItem, firstItemStatus);
     }
 
@@ -144,7 +143,7 @@ public class SimpleDistributionQueue implements DistributionQueue {
         List<DistributionQueueEntry> result = new ArrayList<DistributionQueueEntry>();
 
         for (DistributionQueueItem item : queue) {
-            result.add(new DistributionQueueEntry(item.getPackageId(), item, statusMap.get(item.getPackageId())));
+            result.add(new DistributionQueueEntry(item.getPackageId(), item, statusMap.get(item)));
         }
         return result;
     }
@@ -153,7 +152,7 @@ public class SimpleDistributionQueue implements DistributionQueue {
     public DistributionQueueEntry getEntry(@NotNull String id) {
         for (DistributionQueueItem item : queue) {
             if (id.equals(item.getPackageId())) {
-                return new DistributionQueueEntry(id, item, statusMap.get(item.getPackageId()));
+                return new DistributionQueueEntry(id, item, statusMap.get(item));
             }
         }
 
@@ -180,7 +179,6 @@ public class SimpleDistributionQueue implements DistributionQueue {
         boolean removed = false;
         if (toRemove != null) {
             removed = queue.remove(toRemove.getItem());
-            statusMap.remove(id);
         }
         log.debug("item with id {} removed from the queue: {}", id, removed);
         if (removed) {
@@ -210,4 +208,10 @@ public class SimpleDistributionQueue implements DistributionQueue {
         return removedEntries;
     }
 
+    public void recordProcessingAttempt(@NotNull DistributionQueueEntry entry) {
+        statusMap.computeIfPresent(entry.getItem(), (item, status) -> {
+            return new DistributionQueueItemStatus(status.getEntered(),
+                    status.getItemState(), status.getAttempts() + 1, status.getQueueName());
+        });
+    }
 }
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java
index 462895e..3cf267b 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java
@@ -20,10 +20,8 @@ package org.apache.sling.distribution.queue.impl.simple;
 
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 
-import java.util.Map;
-
+import java.util.function.Consumer;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
-import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
 import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,34 +34,35 @@ public class SimpleDistributionQueueProcessor implements Runnable {
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final DistributionQueue queue;
     private final DistributionQueueProcessor queueProcessor;
-    private final Map<String, DistributionQueueItemStatus> statusMap;
+    private Consumer<DistributionQueueEntry> recordProcessingAttempt;
 
     public SimpleDistributionQueueProcessor(DistributionQueue queue,
                                             DistributionQueueProcessor queueProcessor,
-                                            Map<String, DistributionQueueItemStatus> statusMap) {
+                                            Consumer<DistributionQueueEntry> processingAttemptRecorder) {
         this.queue = queue;
         this.queueProcessor = queueProcessor;
-        this.statusMap = statusMap;
+        this.recordProcessingAttempt = (null != processingAttemptRecorder)?
+                processingAttemptRecorder:
+                (entry) -> {};
     }
 
     public void run() {
         try {
             DistributionQueueEntry entry;
             while ((entry = queue.getHead()) != null) {
-                DistributionQueueItemStatus itemStatus = entry.getStatus();
-                statusMap.put(entry.getId(),  new DistributionQueueItemStatus(itemStatus.getEntered(),
-                        itemStatus.getItemState(), itemStatus.getAttempts() + 1, queue.getName()));
-                if (queueProcessor.process(queue.getName(), entry)) {
-                    if (queue.remove(entry.getId()) != null) {
-                        log.debug("item {} processed and removed from the queue", entry.getItem());
-                    }
+                boolean wasProcessed = queueProcessor.process(queue.getName(), entry);
+                boolean wasRemoved = wasProcessed?
+                        (queue.remove(entry.getId()) != null):
+                            false;
+                if (wasProcessed && wasRemoved) {
+                    log.debug("item {} processed and removed from the queue", entry.getItem());
                 } else {
-                    log.warn("processing of item {} failed", entry.getId());
+                    log.warn("processing and removal of item {} failed; will reattempt", entry.getId());
+                    this.recordProcessingAttempt.accept(entry);
                 }
             }
         } catch (Exception e) {
             log.error("error while processing queue {}", e);
         }
-
     }
 }
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
index bc3d3c7..9315ee0 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
@@ -27,14 +27,15 @@ import java.io.FilenameFilter;
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.LineIterator;
 import org.apache.sling.commons.scheduler.ScheduleOptions;
 import org.apache.sling.commons.scheduler.Scheduler;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
+import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
 import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
 import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
 import org.apache.sling.distribution.queue.DistributionQueueType;
@@ -56,8 +57,7 @@ public class SimpleDistributionQueueProvider implements DistributionQueueProvide
     private final String name;
     private final Scheduler scheduler;
 
-    private final Map<String, SimpleDistributionQueue> queueMap = new ConcurrentHashMap<>();
-    private final Map<String, Map<String, DistributionQueueItemStatus>> statusMap = new ConcurrentHashMap<>();
+    private final Map<String, SimpleDistributionQueue> queueMap = new ConcurrentHashMap<String, SimpleDistributionQueue>();
     private final boolean checkpoint;
     private File checkpointDirectory;
 
@@ -92,11 +92,8 @@ public class SimpleDistributionQueueProvider implements DistributionQueueProvide
         SimpleDistributionQueue queue = queueMap.get(key);
         if (queue == null) {
             log.debug("creating a queue with key {}", key);
-            Map<String, DistributionQueueItemStatus> queueStatusMap
-                    = new ConcurrentHashMap<>();
-            queue = new SimpleDistributionQueue(name, queueName, queueStatusMap);
+            queue = new SimpleDistributionQueue(name, queueName);
             queueMap.put(key, queue);
-            statusMap.put(key, queueStatusMap);
             log.debug("queue created {}", queue);
         }
         return queue;
@@ -155,9 +152,14 @@ public class SimpleDistributionQueueProvider implements DistributionQueueProvide
 
         // enable processing
         for (String queueName : queueNames) {
-            ScheduleOptions options = scheduler.NOW(-1, 1).canRunConcurrently(false).name(getJobName(queueName));
-            scheduler.schedule(new SimpleDistributionQueueProcessor(getQueue(queueName), queueProcessor,
-                    statusMap.get(getKey(queueName))), options);
+            ScheduleOptions options = scheduler.NOW(-1, 1)
+                    .canRunConcurrently(false)
+                    .name(getJobName(queueName));
+            DistributionQueue queueImpl = getQueue(queueName);
+            Consumer<DistributionQueueEntry> processingAttemptRecorder =
+                    ((SimpleDistributionQueue)queueImpl)::recordProcessingAttempt;
+            scheduler.schedule(new SimpleDistributionQueueProcessor(getQueue(queueName), queueProcessor, processingAttemptRecorder),
+                    options);
         }
 
     }
diff --git a/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java b/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java
index 433d2ff..aae6f7b 100644
--- a/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java
+++ b/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java
@@ -21,7 +21,6 @@ package org.apache.sling.distribution.agent.impl;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.distribution.DistributionRequest;
@@ -125,7 +124,7 @@ public class SimpleDistributionAgentTest {
 
 
         when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
-                new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
+                new SimpleDistributionQueue(name, "name"));
         DistributionResponse response = agent.execute(resourceResolver, request);
         assertNotNull(response);
         assertEquals("[ERROR]", response.getMessage());
@@ -165,7 +164,7 @@ public class SimpleDistributionAgentTest {
             }
         }).when(packageExporter).exportPackages(any(ResourceResolver.class), any(DistributionRequest.class), any(DistributionPackageProcessor.class));
         when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
-                new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
+                new SimpleDistributionQueue(name, "name"));
         DistributionResponse response = agent.execute(resourceResolver, request);
         assertNotNull(response);
         assertEquals("[QUEUED]", response.getMessage());
@@ -205,7 +204,7 @@ public class SimpleDistributionAgentTest {
                 return null;
             }
         }).when(packageExporter).exportPackages(any(ResourceResolver.class), any(DistributionRequest.class), any(DistributionPackageProcessor.class));        when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
-                new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
+                new SimpleDistributionQueue(name, "name"));
 
         agent.execute(resourceResolver, request);
     }
@@ -298,7 +297,7 @@ public class SimpleDistributionAgentTest {
                 return null;
             }
         }).when(packageExporter).exportPackages(any(ResourceResolver.class), any(DistributionRequest.class), any(DistributionPackageProcessor.class));        when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
-                new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
+                new SimpleDistributionQueue(name, "name"));
 
         DistributionResponse response = agent.execute(resourceResolver, request);
 
@@ -344,7 +343,7 @@ public class SimpleDistributionAgentTest {
                 return null;
             }
         }).when(packageExporter).exportPackages(any(ResourceResolver.class), any(DistributionRequest.class), any(DistributionPackageProcessor.class));        when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
-                new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
+                new SimpleDistributionQueue(name, "name"));
 
         DistributionResponse response = agent.execute(resourceResolver, request);
 
diff --git a/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProcessingTest.java b/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProcessingTest.java
index 68d08fa..6e6630f 100644
--- a/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProcessingTest.java
+++ b/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProcessingTest.java
@@ -87,7 +87,7 @@ public class ResourceQueueProcessingTest {
             assertEquals(MAX_ENTRIES, resourceQueue.getStatus().getItemsCount());
 
             when(mockResourceQueueProcessor.process(eq(QUEUE_NAME), Matchers.any(DistributionQueueEntry.class)))
-                .thenReturn(true);
+                .thenReturn(false, true);
 
             resourceQueueProvider.enableQueueProcessing(mockResourceQueueProcessor, QUEUE_NAME);
             while (!resourceQueue.getStatus().getState().equals(DistributionQueueState.IDLE)) {
diff --git a/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessorTest.java b/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessorTest.java
index b219f2d..ec7d5c5 100644
--- a/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessorTest.java
+++ b/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessorTest.java
@@ -21,6 +21,7 @@ package org.apache.sling.distribution.queue.impl.simple;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
+
 import org.apache.sling.distribution.queue.DistributionQueueItemState;
 import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
@@ -28,7 +29,6 @@ import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
 import org.junit.Test;
-
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -42,7 +42,7 @@ public class SimpleDistributionQueueProcessorTest {
         DistributionQueue queue = mock(DistributionQueue.class);
         DistributionQueueProcessor queueProcessor = mock(DistributionQueueProcessor.class);
         SimpleDistributionQueueProcessor simpleDistributionQueueProcessor = new SimpleDistributionQueueProcessor(
-                queue, queueProcessor, new HashMap<String, DistributionQueueItemStatus>());
+                queue, queueProcessor, null);
         simpleDistributionQueueProcessor.run();
     }
 
@@ -56,7 +56,7 @@ public class SimpleDistributionQueueProcessorTest {
         when(queueProvider.getQueues()).thenReturn(queues);
         DistributionQueueProcessor queueProcessor = mock(DistributionQueueProcessor.class);
         SimpleDistributionQueueProcessor simpleDistributionQueueProcessor = new SimpleDistributionQueueProcessor(
-                queue, queueProcessor, new HashMap<String, DistributionQueueItemStatus>());
+                queue, queueProcessor, null);
         simpleDistributionQueueProcessor.run();
     }
 
@@ -73,7 +73,7 @@ public class SimpleDistributionQueueProcessorTest {
         when(queueProvider.getQueues()).thenReturn(queues);
         DistributionQueueProcessor queueProcessor = mock(DistributionQueueProcessor.class);
         SimpleDistributionQueueProcessor simpleDistributionQueueProcessor = new SimpleDistributionQueueProcessor(
-                queue, queueProcessor, new HashMap<String, DistributionQueueItemStatus>());
+                queue, queueProcessor, queue::recordProcessingAttempt);
         simpleDistributionQueueProcessor.run();
     }
 }
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueTest.java b/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueTest.java
index aadd3a4..169b871 100644
--- a/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueTest.java
+++ b/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueTest.java
@@ -19,6 +19,7 @@
 package org.apache.sling.distribution.queue.impl.simple;
 
 import java.util.HashMap;
+
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
@@ -38,8 +39,7 @@ public class SimpleDistributionQueueTest {
 
     @Test
     public void testPackageAddition() throws Exception {
-        DistributionQueue queue = new SimpleDistributionQueue("agentName", "default",
-                new HashMap<String, DistributionQueueItemStatus>());
+        DistributionQueue queue = new SimpleDistributionQueue("agentName", "default");
         DistributionQueueItem pkg = new DistributionQueueItem("packageId", new HashMap<String, Object>());
         assertNotNull(queue.add(pkg));
         assertFalse(queue.getStatus().isEmpty());
@@ -47,8 +47,7 @@ public class SimpleDistributionQueueTest {
 
     @Test
     public void testPackageAdditionAndRemoval() throws Exception {
-        DistributionQueue queue = new SimpleDistributionQueue("agentName", "default",
-                new HashMap<String, DistributionQueueItemStatus>());
+        DistributionQueue queue = new SimpleDistributionQueue("agentName", "default");
         DistributionQueueItem pkg = new DistributionQueueItem("id", new HashMap<String, Object>());
         assertNotNull(queue.add(pkg));
         assertFalse(queue.getStatus().isEmpty());
@@ -60,18 +59,26 @@ public class SimpleDistributionQueueTest {
 
     @Test
     public void testPackageAdditionRetrievalAndRemoval() throws Exception {
-        DistributionQueue queue = new SimpleDistributionQueue("agentName", "default",
-                new HashMap<String, DistributionQueueItemStatus>());
+        DistributionQueue queue = new SimpleDistributionQueue("agentName", "default");
         DistributionQueueItem pkg = new DistributionQueueItem("id", new HashMap<String, Object>());
         assertNotNull(queue.add(pkg));
         assertFalse(queue.getStatus().isEmpty());
-        assertEquals(pkg, queue.getHead().getItem());
-        assertFalse(queue.getStatus().isEmpty());
+        DistributionQueueEntry entry = queue.getHead();
+
         DistributionQueueItemStatus status = queue.getEntry(pkg.getPackageId()).getStatus();
+        assertNotNull(status);
+        assertEquals(0, status.getAttempts());
+
+        ((SimpleDistributionQueue)queue).recordProcessingAttempt(entry);
+
+        assertEquals(pkg, entry.getItem());
+        assertFalse(queue.getStatus().isEmpty());
+
+        status = queue.getEntry(pkg.getPackageId()).getStatus();
         assertNotNull(queue.remove(pkg.getPackageId()));
         assertTrue(queue.getStatus().isEmpty());
         assertNotNull(status);
-        assertEquals(0, status.getAttempts());
+        assertEquals(1, status.getAttempts());
     }
 
 }