You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by mp...@apache.org on 2016/04/13 13:42:52 UTC

svn commit: r1738924 - in /sling/trunk/contrib/extensions/distribution: core/src/main/java/org/apache/sling/distribution/agent/impl/ core/src/main/java/org/apache/sling/distribution/queue/impl/ core/src/main/java/org/apache/sling/distribution/queue/imp...

Author: mpetria
Date: Wed Apr 13 11:42:52 2016
New Revision: 1738924

URL: http://svn.apache.org/viewvc?rev=1738924&view=rev
Log:
SLING-5576: optimizations for distribution queue items listings

Added:
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/CachingDistributionQueue.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/SimpleAgentDistributionQueue.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/resources/impl/common/SimpleReadableResourceIterator.java
Modified:
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/DistributionAgentJobConsumer.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueueProvider.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/resources/impl/ExtendedDistributionServiceResourceProvider.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/resources/impl/common/AbstractReadableResourceProvider.java
    sling/trunk/contrib/extensions/distribution/it/src/test/java/org/apache/sling/distribution/it/DistributionUtils.java

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java?rev=1738924&r1=1738923&r2=1738924&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java Wed Apr 13 11:42:52 2016
@@ -58,6 +58,7 @@ import org.apache.sling.distribution.log
 import org.apache.sling.distribution.packaging.DistributionPackageProcessor;
 import org.apache.sling.distribution.queue.DistributionQueueStatus;
 import org.apache.sling.distribution.queue.impl.DistributionQueueWrapper;
+import org.apache.sling.distribution.queue.impl.SimpleAgentDistributionQueue;
 import org.apache.sling.distribution.serialization.DistributionPackage;
 import org.apache.sling.distribution.packaging.DistributionPackageExporter;
 import org.apache.sling.distribution.packaging.DistributionPackageImporter;
@@ -305,18 +306,8 @@ public class SimpleDistributionAgent imp
         }
 
         if (queue != null) {
-            queue = new DistributionQueueWrapper(queue) {
-                @Nonnull
-                @Override
-                public DistributionQueueStatus getStatus() {
-                    DistributionQueueStatus status = super.getStatus();
-                    if (!queueProcessingEnabled && (processingQueues != null && processingQueues.contains(queueName))) {
-                        return new DistributionQueueStatus(status.getItemsCount(), DistributionQueueState.PAUSED);
-                    } else {
-                        return status;
-                    }
-                }
-            };
+            boolean isPausedQueue = !queueProcessingEnabled && (processingQueues != null && processingQueues.contains(queueName));
+            queue = new SimpleAgentDistributionQueue(queue, isPausedQueue, name);
         }
 
         return queue;

Added: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/CachingDistributionQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/CachingDistributionQueue.java?rev=1738924&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/CachingDistributionQueue.java (added)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/CachingDistributionQueue.java Wed Apr 13 11:42:52 2016
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.queue.impl;
+
+
+import org.apache.sling.distribution.queue.DistributionQueue;
+import org.apache.sling.distribution.queue.DistributionQueueEntry;
+import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.queue.DistributionQueueStatus;
+
+import javax.annotation.Nonnull;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class CachingDistributionQueue extends DistributionQueueWrapper {
+    // cache status for 30 sec as it is expensive to count items
+    private static final int EXPIRY_QUEUE_CACHE = 30 * 1000;
+
+    static Map<String, DistributionQueueStatus> queueCache = new ConcurrentHashMap<String, DistributionQueueStatus>();
+    static Map<String, Long> queueCacheExpiry = new ConcurrentHashMap<String, Long>();
+    private final String cacheKey;
+
+    public CachingDistributionQueue(String cacheKey, DistributionQueue wrappedQueue) {
+        super(wrappedQueue);
+        this.cacheKey = cacheKey;
+    }
+
+
+    @Override
+    public DistributionQueueStatus getStatus() {
+
+        DistributionQueueStatus queueStatus = null;
+        long now = System.currentTimeMillis();
+
+        Long expiryDate = queueCacheExpiry.get(cacheKey);
+        if (expiryDate != null && expiryDate < now) {
+            queueCache.remove(cacheKey);
+            queueCacheExpiry.remove(cacheKey);
+        }
+
+        queueStatus = queueCache.get(cacheKey);
+
+        if (queueStatus != null) {
+            return queueStatus;
+        }
+
+        queueStatus = wrappedQueue.getStatus();
+
+        if (queueStatus != null) {
+            queueCache.put(cacheKey, queueStatus);
+            queueCacheExpiry.put(cacheKey,  System.currentTimeMillis() + EXPIRY_QUEUE_CACHE);
+        }
+
+        return queueStatus;
+    }
+
+    @Override
+    public DistributionQueueEntry add(@Nonnull DistributionQueueItem item) {
+        queueCache.remove(cacheKey);
+        queueCacheExpiry.remove(cacheKey);
+        return super.add(item);
+    }
+
+    @Override
+    public DistributionQueueEntry remove(@Nonnull String itemId) {
+        queueCache.remove(cacheKey);
+        queueCacheExpiry.remove(cacheKey);
+        return super.remove(itemId);
+    }
+}

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java?rev=1738924&r1=1738923&r2=1738924&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java Wed Apr 13 11:42:52 2016
@@ -26,7 +26,7 @@ import org.apache.sling.distribution.que
 import javax.annotation.Nonnull;
 
 public abstract class DistributionQueueWrapper implements DistributionQueue {
-    private final DistributionQueue wrappedQueue;
+    protected final DistributionQueue wrappedQueue;
 
     public DistributionQueueWrapper(DistributionQueue wrappedQueue) {
 

Added: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/SimpleAgentDistributionQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/SimpleAgentDistributionQueue.java?rev=1738924&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/SimpleAgentDistributionQueue.java (added)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/SimpleAgentDistributionQueue.java Wed Apr 13 11:42:52 2016
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.queue.impl;
+
+
+import org.apache.sling.distribution.queue.DistributionQueue;
+import org.apache.sling.distribution.queue.DistributionQueueState;
+import org.apache.sling.distribution.queue.DistributionQueueStatus;
+
+import javax.annotation.Nonnull;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SimpleAgentDistributionQueue extends DistributionQueueWrapper {
+
+    private final boolean isPaused;
+    private final String agentName;
+
+    public SimpleAgentDistributionQueue(DistributionQueue wrappedQueue, boolean isPaused, String agentName) {
+        super(wrappedQueue);
+        this.isPaused = isPaused;
+        this.agentName = agentName;
+    }
+
+    @Nonnull
+    @Override
+    public DistributionQueueStatus getStatus() {
+        return calculateStatus();
+    }
+
+
+    private DistributionQueueStatus calculateStatus() {
+        DistributionQueueStatus status = super.getStatus();
+        if (isPaused) {
+            return new DistributionQueueStatus(status.getItemsCount(), DistributionQueueState.PAUSED);
+        } else {
+            return status;
+        }
+    }
+}

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/DistributionAgentJobConsumer.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/DistributionAgentJobConsumer.java?rev=1738924&r1=1738923&r2=1738924&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/DistributionAgentJobConsumer.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/DistributionAgentJobConsumer.java Wed Apr 13 11:42:52 2016
@@ -45,7 +45,6 @@ class DistributionAgentJobConsumer imple
         boolean processingResult = false;
         if (entry != null) {
             String queueName = entry.getStatus().getQueueName();
-            DistributionQueueItem item = entry.getItem();
             log.debug("processing item {} in queue {}", entry.getId(), queueName);
             processingResult = queueProcessor.process(queueName, entry);
             log.debug("item {} processed {}", entry.getId());

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueueProvider.java?rev=1738924&r1=1738923&r2=1738924&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueueProvider.java Wed Apr 13 11:42:52 2016
@@ -31,6 +31,7 @@ import org.apache.sling.distribution.com
 import org.apache.sling.distribution.queue.DistributionQueue;
 import org.apache.sling.distribution.queue.DistributionQueueProcessor;
 import org.apache.sling.distribution.queue.DistributionQueueProvider;
+import org.apache.sling.distribution.queue.impl.CachingDistributionQueue;
 import org.apache.sling.event.jobs.JobManager;
 import org.apache.sling.event.jobs.consumer.JobConsumer;
 import org.osgi.framework.BundleContext;
@@ -69,7 +70,9 @@ public class JobHandlingDistributionQueu
         String topic = JobHandlingDistributionQueue.DISTRIBUTION_QUEUE_TOPIC + '/' + name + "/" + queueName;
         boolean isActive = jobConsumer != null && (processingQueueNames == null || processingQueueNames.contains(queueName));
 
-        return new JobHandlingDistributionQueue(queueName, topic, jobManager, isActive);
+        DistributionQueue queue = new JobHandlingDistributionQueue(queueName, topic, jobManager, isActive);
+        queue = new CachingDistributionQueue(topic, queue);
+        return queue;
     }
 
 

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/resources/impl/ExtendedDistributionServiceResourceProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/resources/impl/ExtendedDistributionServiceResourceProvider.java?rev=1738924&r1=1738923&r2=1738924&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/resources/impl/ExtendedDistributionServiceResourceProvider.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/resources/impl/ExtendedDistributionServiceResourceProvider.java Wed Apr 13 11:42:52 2016
@@ -21,8 +21,11 @@ package org.apache.sling.distribution.re
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
 
 import org.apache.sling.distribution.agent.DistributionAgent;
 import org.apache.sling.distribution.agent.DistributionAgentState;
@@ -51,7 +54,9 @@ public class ExtendedDistributionService
     private static final String STATUS_PATH = "status";
 
 
-    private static final int MAX_QUEUE_DEPTH = 100;
+    private static final int MAX_QUEUE_LENGTH = 5000;
+    private static final int MAX_QUEUE_CHUNK = 100;
+
 
 
     public ExtendedDistributionServiceResourceProvider(String kind,
@@ -145,14 +150,14 @@ public class ExtendedDistributionService
                 }
 
                 List<String> nameList = new ArrayList<String>();
-                Map<String, Map<String, Object>> propertiesMap = new HashMap<String, Map<String, Object>>();
-                for (DistributionQueueEntry entry : queue.getItems(0, MAX_QUEUE_DEPTH)) {
+
+                DistributionQueueEntry entry = queue.getHead();
+                if (entry != null) {
                     nameList.add(entry.getId());
-                    propertiesMap.put(entry.getId(), getItemProperties(entry));
                 }
 
                 result.put(ITEMS, nameList.toArray(new String[nameList.size()]));
-                result.put(INTERNAL_ITEMS_PROPERTIES, propertiesMap);
+                result.put(INTERNAL_ITEMS_ITERATOR, new QueueItemsIterator(queue));
                 result.put(INTERNAL_ADAPTABLE, queue);
             }
 
@@ -206,4 +211,49 @@ public class ExtendedDistributionService
 
     }
 
+
+    class QueueItemsIterator implements Iterator<Map<String, Object>> {
+
+        private final DistributionQueue queue;
+        private Iterator<DistributionQueueEntry> items;
+        int fetched = 0;
+
+        QueueItemsIterator(DistributionQueue queue) {
+            this.queue = queue;
+        }
+
+        @Override
+        public boolean hasNext() {
+
+            if (fetched > MAX_QUEUE_LENGTH) {
+                return false;
+            }
+
+            boolean shouldFetch = items == null || !items.hasNext();
+
+            if (shouldFetch) {
+                items = queue.getItems(fetched, MAX_QUEUE_CHUNK).iterator();
+            }
+
+            boolean hasNext = items.hasNext();
+            return hasNext;
+        }
+
+        @Override
+        public Map<String, Object> next() {
+            DistributionQueueEntry queueEntry = items.next();
+            String itemName = queueEntry.getId();
+            Map<String, Object> itemProperties = getItemProperties(queueEntry);
+            itemProperties.put(INTERNAL_NAME, itemName);
+
+            fetched ++;
+            return itemProperties;
+        }
+
+        @Override
+        public void remove() {
+            items.remove();
+        }
+    }
+
 }

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/resources/impl/common/AbstractReadableResourceProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/resources/impl/common/AbstractReadableResourceProvider.java?rev=1738924&r1=1738923&r2=1738924&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/resources/impl/common/AbstractReadableResourceProvider.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/resources/impl/common/AbstractReadableResourceProvider.java Wed Apr 13 11:42:52 2016
@@ -40,7 +40,9 @@ public abstract class AbstractReadableRe
 
     protected static final String INTERNAL_ADAPTABLE = "internal:adaptable";
 
-    protected static final String INTERNAL_ITEMS_PROPERTIES = "internal:propertiesMap";
+    public static final String INTERNAL_NAME = "internal:adaptable";
+
+    protected static final String INTERNAL_ITEMS_ITERATOR = "internal:itemsIterator";
 
     protected static final String ITEMS = "items";
 
@@ -79,7 +81,7 @@ public abstract class AbstractReadableRe
 
         if (properties != null) {
             Object adaptable = properties.remove(INTERNAL_ADAPTABLE);
-            properties.remove(INTERNAL_ITEMS_PROPERTIES);
+            properties.remove(INTERNAL_ITEMS_ITERATOR);
 
             resource = buildMainResource(resourceResolver, pathInfo, properties, adaptable);
         }
@@ -137,7 +139,7 @@ public abstract class AbstractReadableRe
 
         List<Resource> resourceList = new ArrayList<Resource>();
         Iterable<String> childrenList = getResourceChildren(pathInfo);
-        Map<String, Map<String, Object>> childrenProperties = new HashMap<String, Map<String, Object>>();
+        Iterator<Map<String,Object>> childrenProperties = null;
 
         if (childrenList == null) {
             Map<String, Object> properties = getResourceProperties(pathInfo);
@@ -148,28 +150,22 @@ public abstract class AbstractReadableRe
                 childrenList = Arrays.asList(itemsArray);
             }
 
-            if (properties != null && properties.containsKey(INTERNAL_ITEMS_PROPERTIES)) {
-                childrenProperties = (Map) properties.get(INTERNAL_ITEMS_PROPERTIES);
+            if (properties != null && properties.containsKey(INTERNAL_ITEMS_ITERATOR)) {
+                childrenProperties = (Iterator<Map<String,Object>>) properties.get(INTERNAL_ITEMS_ITERATOR);
             }
         }
 
-        if (childrenList != null) {
+        if (childrenProperties != null) {
+            return new SimpleReadableResourceIterator(childrenProperties, resourceResolver, path);
+        } else if (childrenList != null) {
             for (String childResourceName : childrenList) {
-
-                Resource childResource;
-                if (childrenProperties.containsKey(childResourceName)) {
-                    Map<String, Object> childProperties = childrenProperties.get(childResourceName);
-                    SimplePathInfo childPathInfo = extractPathInfo(path + "/" + childResourceName);
-                    childResource = buildMainResource(resourceResolver, childPathInfo, childProperties);
-
-                } else {
-                    childResource = getResource(resourceResolver, path + "/" + childResourceName);
-
-                }
+                Resource childResource = getResource(resourceResolver, path + "/" + childResourceName);;
                 resourceList.add(childResource);
             }
         }
 
+
+
         return resourceList.listIterator();
     }
 

Added: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/resources/impl/common/SimpleReadableResourceIterator.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/resources/impl/common/SimpleReadableResourceIterator.java?rev=1738924&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/resources/impl/common/SimpleReadableResourceIterator.java (added)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/resources/impl/common/SimpleReadableResourceIterator.java Wed Apr 13 11:42:52 2016
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.resources.impl.common;
+
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Read only resource collection.
+ */
+public class SimpleReadableResourceIterator  implements Iterator<Resource> {
+    private final Iterator<Map<String, Object>> itemsIterator;
+    private final ResourceResolver resourceResolver;
+    private final String parentPath;
+
+    SimpleReadableResourceIterator(Iterator<Map<String, Object>> itemsIterator, ResourceResolver resourceResolver, String parentPath) {
+
+        this.itemsIterator = itemsIterator;
+        this.resourceResolver = resourceResolver;
+        this.parentPath = parentPath;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return itemsIterator.hasNext();
+    }
+
+    @Override
+    public Resource next() {
+        Map<String, Object> itemProperties = itemsIterator.next();
+        String itemName = (String) itemProperties.remove(AbstractReadableResourceProvider.INTERNAL_NAME);
+        String resourcePath = parentPath + "/" + itemName;
+        Resource itemResource = new SimpleReadableResource(resourceResolver, resourcePath, itemProperties);
+        return itemResource;
+    }
+
+    @Override
+    public void remove() {
+        itemsIterator.remove();
+    }
+}

Modified: sling/trunk/contrib/extensions/distribution/it/src/test/java/org/apache/sling/distribution/it/DistributionUtils.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/it/src/test/java/org/apache/sling/distribution/it/DistributionUtils.java?rev=1738924&r1=1738923&r2=1738924&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/it/src/test/java/org/apache/sling/distribution/it/DistributionUtils.java (original)
+++ sling/trunk/contrib/extensions/distribution/it/src/test/java/org/apache/sling/distribution/it/DistributionUtils.java Wed Apr 13 11:42:52 2016
@@ -344,24 +344,25 @@ public class DistributionUtils {
 
         JSONObject json = getResource(instance, queueUrl + ".infinity");
 
-        JSONArray items = json.getJSONArray("items");
 
-        for(int i=0; i < items.length(); i++) {
-            String itemId = items.getString(i);
-            JSONObject queueItem = json.getJSONObject(itemId);
+        Iterator<String> keys = json.keys();
+        while (keys.hasNext()) {
+            String key = keys.next();
+            JSONObject queueItem = json.optJSONObject(key);
+            if (queueItem != null && queueItem.optString("id") != null) {
+
+                Map<String, Object> itemProperties = new HashMap<String, Object>();
+
+                itemProperties.put("id", queueItem.get("id"));
+                itemProperties.put("paths", queueItem.get("paths"));
+                itemProperties.put("action", queueItem.get("action"));
+                itemProperties.put("userid", queueItem.get("userid"));
+                itemProperties.put("attempts", queueItem.get("attempts"));
+                itemProperties.put("time", queueItem.get("time"));
+                itemProperties.put("state", queueItem.get("state"));
 
-
-            Map<String, Object> itemProperties = new HashMap<String, Object>();
-
-            itemProperties.put("id", queueItem.get("id"));
-            itemProperties.put("paths", queueItem.get("paths"));
-            itemProperties.put("action", queueItem.get("action"));
-            itemProperties.put("userid", queueItem.get("userid"));
-            itemProperties.put("attempts", queueItem.get("attempts"));
-            itemProperties.put("time", queueItem.get("time"));
-            itemProperties.put("state", queueItem.get("state"));
-
-            result.add(itemProperties);
+                result.add(itemProperties);
+            }
         }
 
         return result;