You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by mp...@apache.org on 2016/04/13 13:42:52 UTC
svn commit: r1738924 - in /sling/trunk/contrib/extensions/distribution:
core/src/main/java/org/apache/sling/distribution/agent/impl/
core/src/main/java/org/apache/sling/distribution/queue/impl/
core/src/main/java/org/apache/sling/distribution/queue/imp...
Author: mpetria
Date: Wed Apr 13 11:42:52 2016
New Revision: 1738924
URL: http://svn.apache.org/viewvc?rev=1738924&view=rev
Log:
SLING-5576: optimizations for distribution queue items listings
Added:
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/CachingDistributionQueue.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/SimpleAgentDistributionQueue.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/resources/impl/common/SimpleReadableResourceIterator.java
Modified:
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/DistributionAgentJobConsumer.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueueProvider.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/resources/impl/ExtendedDistributionServiceResourceProvider.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/resources/impl/common/AbstractReadableResourceProvider.java
sling/trunk/contrib/extensions/distribution/it/src/test/java/org/apache/sling/distribution/it/DistributionUtils.java
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java?rev=1738924&r1=1738923&r2=1738924&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java Wed Apr 13 11:42:52 2016
@@ -58,6 +58,7 @@ import org.apache.sling.distribution.log
import org.apache.sling.distribution.packaging.DistributionPackageProcessor;
import org.apache.sling.distribution.queue.DistributionQueueStatus;
import org.apache.sling.distribution.queue.impl.DistributionQueueWrapper;
+import org.apache.sling.distribution.queue.impl.SimpleAgentDistributionQueue;
import org.apache.sling.distribution.serialization.DistributionPackage;
import org.apache.sling.distribution.packaging.DistributionPackageExporter;
import org.apache.sling.distribution.packaging.DistributionPackageImporter;
@@ -305,18 +306,8 @@ public class SimpleDistributionAgent imp
}
if (queue != null) {
- queue = new DistributionQueueWrapper(queue) {
- @Nonnull
- @Override
- public DistributionQueueStatus getStatus() {
- DistributionQueueStatus status = super.getStatus();
- if (!queueProcessingEnabled && (processingQueues != null && processingQueues.contains(queueName))) {
- return new DistributionQueueStatus(status.getItemsCount(), DistributionQueueState.PAUSED);
- } else {
- return status;
- }
- }
- };
+ boolean isPausedQueue = !queueProcessingEnabled && (processingQueues != null && processingQueues.contains(queueName));
+ queue = new SimpleAgentDistributionQueue(queue, isPausedQueue, name);
}
return queue;
Added: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/CachingDistributionQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/CachingDistributionQueue.java?rev=1738924&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/CachingDistributionQueue.java (added)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/CachingDistributionQueue.java Wed Apr 13 11:42:52 2016
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.queue.impl;
+
+
+import org.apache.sling.distribution.queue.DistributionQueue;
+import org.apache.sling.distribution.queue.DistributionQueueEntry;
+import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.queue.DistributionQueueStatus;
+
+import javax.annotation.Nonnull;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class CachingDistributionQueue extends DistributionQueueWrapper {
+ // cache status for 30 sec as it is expensive to count items
+ private static final int EXPIRY_QUEUE_CACHE = 30 * 1000;
+
+ static Map<String, DistributionQueueStatus> queueCache = new ConcurrentHashMap<String, DistributionQueueStatus>();
+ static Map<String, Long> queueCacheExpiry = new ConcurrentHashMap<String, Long>();
+ private final String cacheKey;
+
+ public CachingDistributionQueue(String cacheKey, DistributionQueue wrappedQueue) {
+ super(wrappedQueue);
+ this.cacheKey = cacheKey;
+ }
+
+
+ @Override
+ public DistributionQueueStatus getStatus() {
+
+ DistributionQueueStatus queueStatus = null;
+ long now = System.currentTimeMillis();
+
+ Long expiryDate = queueCacheExpiry.get(cacheKey);
+ if (expiryDate != null && expiryDate < now) {
+ queueCache.remove(cacheKey);
+ queueCacheExpiry.remove(cacheKey);
+ }
+
+ queueStatus = queueCache.get(cacheKey);
+
+ if (queueStatus != null) {
+ return queueStatus;
+ }
+
+ queueStatus = wrappedQueue.getStatus();
+
+ if (queueStatus != null) {
+ queueCache.put(cacheKey, queueStatus);
+ queueCacheExpiry.put(cacheKey, System.currentTimeMillis() + EXPIRY_QUEUE_CACHE);
+ }
+
+ return queueStatus;
+ }
+
+ @Override
+ public DistributionQueueEntry add(@Nonnull DistributionQueueItem item) {
+ queueCache.remove(cacheKey);
+ queueCacheExpiry.remove(cacheKey);
+ return super.add(item);
+ }
+
+ @Override
+ public DistributionQueueEntry remove(@Nonnull String itemId) {
+ queueCache.remove(cacheKey);
+ queueCacheExpiry.remove(cacheKey);
+ return super.remove(itemId);
+ }
+}
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java?rev=1738924&r1=1738923&r2=1738924&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java Wed Apr 13 11:42:52 2016
@@ -26,7 +26,7 @@ import org.apache.sling.distribution.que
import javax.annotation.Nonnull;
public abstract class DistributionQueueWrapper implements DistributionQueue {
- private final DistributionQueue wrappedQueue;
+ protected final DistributionQueue wrappedQueue;
public DistributionQueueWrapper(DistributionQueue wrappedQueue) {
Added: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/SimpleAgentDistributionQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/SimpleAgentDistributionQueue.java?rev=1738924&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/SimpleAgentDistributionQueue.java (added)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/SimpleAgentDistributionQueue.java Wed Apr 13 11:42:52 2016
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.queue.impl;
+
+
+import org.apache.sling.distribution.queue.DistributionQueue;
+import org.apache.sling.distribution.queue.DistributionQueueState;
+import org.apache.sling.distribution.queue.DistributionQueueStatus;
+
+import javax.annotation.Nonnull;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SimpleAgentDistributionQueue extends DistributionQueueWrapper {
+
+ private final boolean isPaused;
+ private final String agentName;
+
+ public SimpleAgentDistributionQueue(DistributionQueue wrappedQueue, boolean isPaused, String agentName) {
+ super(wrappedQueue);
+ this.isPaused = isPaused;
+ this.agentName = agentName;
+ }
+
+ @Nonnull
+ @Override
+ public DistributionQueueStatus getStatus() {
+ return calculateStatus();
+ }
+
+
+ private DistributionQueueStatus calculateStatus() {
+ DistributionQueueStatus status = super.getStatus();
+ if (isPaused) {
+ return new DistributionQueueStatus(status.getItemsCount(), DistributionQueueState.PAUSED);
+ } else {
+ return status;
+ }
+ }
+}
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/DistributionAgentJobConsumer.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/DistributionAgentJobConsumer.java?rev=1738924&r1=1738923&r2=1738924&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/DistributionAgentJobConsumer.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/DistributionAgentJobConsumer.java Wed Apr 13 11:42:52 2016
@@ -45,7 +45,6 @@ class DistributionAgentJobConsumer imple
boolean processingResult = false;
if (entry != null) {
String queueName = entry.getStatus().getQueueName();
- DistributionQueueItem item = entry.getItem();
log.debug("processing item {} in queue {}", entry.getId(), queueName);
processingResult = queueProcessor.process(queueName, entry);
log.debug("item {} processed {}", entry.getId());
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueueProvider.java?rev=1738924&r1=1738923&r2=1738924&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueueProvider.java Wed Apr 13 11:42:52 2016
@@ -31,6 +31,7 @@ import org.apache.sling.distribution.com
import org.apache.sling.distribution.queue.DistributionQueue;
import org.apache.sling.distribution.queue.DistributionQueueProcessor;
import org.apache.sling.distribution.queue.DistributionQueueProvider;
+import org.apache.sling.distribution.queue.impl.CachingDistributionQueue;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.osgi.framework.BundleContext;
@@ -69,7 +70,9 @@ public class JobHandlingDistributionQueu
String topic = JobHandlingDistributionQueue.DISTRIBUTION_QUEUE_TOPIC + '/' + name + "/" + queueName;
boolean isActive = jobConsumer != null && (processingQueueNames == null || processingQueueNames.contains(queueName));
- return new JobHandlingDistributionQueue(queueName, topic, jobManager, isActive);
+ DistributionQueue queue = new JobHandlingDistributionQueue(queueName, topic, jobManager, isActive);
+ queue = new CachingDistributionQueue(topic, queue);
+ return queue;
}
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/resources/impl/ExtendedDistributionServiceResourceProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/resources/impl/ExtendedDistributionServiceResourceProvider.java?rev=1738924&r1=1738923&r2=1738924&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/resources/impl/ExtendedDistributionServiceResourceProvider.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/resources/impl/ExtendedDistributionServiceResourceProvider.java Wed Apr 13 11:42:52 2016
@@ -21,8 +21,11 @@ package org.apache.sling.distribution.re
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.sling.distribution.agent.DistributionAgent;
import org.apache.sling.distribution.agent.DistributionAgentState;
@@ -51,7 +54,9 @@ public class ExtendedDistributionService
private static final String STATUS_PATH = "status";
- private static final int MAX_QUEUE_DEPTH = 100;
+ private static final int MAX_QUEUE_LENGTH = 5000;
+ private static final int MAX_QUEUE_CHUNK = 100;
+
public ExtendedDistributionServiceResourceProvider(String kind,
@@ -145,14 +150,14 @@ public class ExtendedDistributionService
}
List<String> nameList = new ArrayList<String>();
- Map<String, Map<String, Object>> propertiesMap = new HashMap<String, Map<String, Object>>();
- for (DistributionQueueEntry entry : queue.getItems(0, MAX_QUEUE_DEPTH)) {
+
+ DistributionQueueEntry entry = queue.getHead();
+ if (entry != null) {
nameList.add(entry.getId());
- propertiesMap.put(entry.getId(), getItemProperties(entry));
}
result.put(ITEMS, nameList.toArray(new String[nameList.size()]));
- result.put(INTERNAL_ITEMS_PROPERTIES, propertiesMap);
+ result.put(INTERNAL_ITEMS_ITERATOR, new QueueItemsIterator(queue));
result.put(INTERNAL_ADAPTABLE, queue);
}
@@ -206,4 +211,49 @@ public class ExtendedDistributionService
}
+
+ class QueueItemsIterator implements Iterator<Map<String, Object>> {
+
+ private final DistributionQueue queue;
+ private Iterator<DistributionQueueEntry> items;
+ int fetched = 0;
+
+ QueueItemsIterator(DistributionQueue queue) {
+ this.queue = queue;
+ }
+
+ @Override
+ public boolean hasNext() {
+
+ if (fetched > MAX_QUEUE_LENGTH) {
+ return false;
+ }
+
+ boolean shouldFetch = items == null || !items.hasNext();
+
+ if (shouldFetch) {
+ items = queue.getItems(fetched, MAX_QUEUE_CHUNK).iterator();
+ }
+
+ boolean hasNext = items.hasNext();
+ return hasNext;
+ }
+
+ @Override
+ public Map<String, Object> next() {
+ DistributionQueueEntry queueEntry = items.next();
+ String itemName = queueEntry.getId();
+ Map<String, Object> itemProperties = getItemProperties(queueEntry);
+ itemProperties.put(INTERNAL_NAME, itemName);
+
+ fetched ++;
+ return itemProperties;
+ }
+
+ @Override
+ public void remove() {
+ items.remove();
+ }
+ }
+
}
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/resources/impl/common/AbstractReadableResourceProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/resources/impl/common/AbstractReadableResourceProvider.java?rev=1738924&r1=1738923&r2=1738924&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/resources/impl/common/AbstractReadableResourceProvider.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/resources/impl/common/AbstractReadableResourceProvider.java Wed Apr 13 11:42:52 2016
@@ -40,7 +40,9 @@ public abstract class AbstractReadableRe
protected static final String INTERNAL_ADAPTABLE = "internal:adaptable";
- protected static final String INTERNAL_ITEMS_PROPERTIES = "internal:propertiesMap";
+ public static final String INTERNAL_NAME = "internal:adaptable";
+
+ protected static final String INTERNAL_ITEMS_ITERATOR = "internal:itemsIterator";
protected static final String ITEMS = "items";
@@ -79,7 +81,7 @@ public abstract class AbstractReadableRe
if (properties != null) {
Object adaptable = properties.remove(INTERNAL_ADAPTABLE);
- properties.remove(INTERNAL_ITEMS_PROPERTIES);
+ properties.remove(INTERNAL_ITEMS_ITERATOR);
resource = buildMainResource(resourceResolver, pathInfo, properties, adaptable);
}
@@ -137,7 +139,7 @@ public abstract class AbstractReadableRe
List<Resource> resourceList = new ArrayList<Resource>();
Iterable<String> childrenList = getResourceChildren(pathInfo);
- Map<String, Map<String, Object>> childrenProperties = new HashMap<String, Map<String, Object>>();
+ Iterator<Map<String,Object>> childrenProperties = null;
if (childrenList == null) {
Map<String, Object> properties = getResourceProperties(pathInfo);
@@ -148,28 +150,22 @@ public abstract class AbstractReadableRe
childrenList = Arrays.asList(itemsArray);
}
- if (properties != null && properties.containsKey(INTERNAL_ITEMS_PROPERTIES)) {
- childrenProperties = (Map) properties.get(INTERNAL_ITEMS_PROPERTIES);
+ if (properties != null && properties.containsKey(INTERNAL_ITEMS_ITERATOR)) {
+ childrenProperties = (Iterator<Map<String,Object>>) properties.get(INTERNAL_ITEMS_ITERATOR);
}
}
- if (childrenList != null) {
+ if (childrenProperties != null) {
+ return new SimpleReadableResourceIterator(childrenProperties, resourceResolver, path);
+ } else if (childrenList != null) {
for (String childResourceName : childrenList) {
-
- Resource childResource;
- if (childrenProperties.containsKey(childResourceName)) {
- Map<String, Object> childProperties = childrenProperties.get(childResourceName);
- SimplePathInfo childPathInfo = extractPathInfo(path + "/" + childResourceName);
- childResource = buildMainResource(resourceResolver, childPathInfo, childProperties);
-
- } else {
- childResource = getResource(resourceResolver, path + "/" + childResourceName);
-
- }
+ Resource childResource = getResource(resourceResolver, path + "/" + childResourceName);;
resourceList.add(childResource);
}
}
+
+
return resourceList.listIterator();
}
Added: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/resources/impl/common/SimpleReadableResourceIterator.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/resources/impl/common/SimpleReadableResourceIterator.java?rev=1738924&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/resources/impl/common/SimpleReadableResourceIterator.java (added)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/resources/impl/common/SimpleReadableResourceIterator.java Wed Apr 13 11:42:52 2016
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.resources.impl.common;
+
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Read only resource collection.
+ */
+public class SimpleReadableResourceIterator implements Iterator<Resource> {
+ private final Iterator<Map<String, Object>> itemsIterator;
+ private final ResourceResolver resourceResolver;
+ private final String parentPath;
+
+ SimpleReadableResourceIterator(Iterator<Map<String, Object>> itemsIterator, ResourceResolver resourceResolver, String parentPath) {
+
+ this.itemsIterator = itemsIterator;
+ this.resourceResolver = resourceResolver;
+ this.parentPath = parentPath;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return itemsIterator.hasNext();
+ }
+
+ @Override
+ public Resource next() {
+ Map<String, Object> itemProperties = itemsIterator.next();
+ String itemName = (String) itemProperties.remove(AbstractReadableResourceProvider.INTERNAL_NAME);
+ String resourcePath = parentPath + "/" + itemName;
+ Resource itemResource = new SimpleReadableResource(resourceResolver, resourcePath, itemProperties);
+ return itemResource;
+ }
+
+ @Override
+ public void remove() {
+ itemsIterator.remove();
+ }
+}
Modified: sling/trunk/contrib/extensions/distribution/it/src/test/java/org/apache/sling/distribution/it/DistributionUtils.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/it/src/test/java/org/apache/sling/distribution/it/DistributionUtils.java?rev=1738924&r1=1738923&r2=1738924&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/it/src/test/java/org/apache/sling/distribution/it/DistributionUtils.java (original)
+++ sling/trunk/contrib/extensions/distribution/it/src/test/java/org/apache/sling/distribution/it/DistributionUtils.java Wed Apr 13 11:42:52 2016
@@ -344,24 +344,25 @@ public class DistributionUtils {
JSONObject json = getResource(instance, queueUrl + ".infinity");
- JSONArray items = json.getJSONArray("items");
- for(int i=0; i < items.length(); i++) {
- String itemId = items.getString(i);
- JSONObject queueItem = json.getJSONObject(itemId);
+ Iterator<String> keys = json.keys();
+ while (keys.hasNext()) {
+ String key = keys.next();
+ JSONObject queueItem = json.optJSONObject(key);
+ if (queueItem != null && queueItem.optString("id") != null) {
+
+ Map<String, Object> itemProperties = new HashMap<String, Object>();
+
+ itemProperties.put("id", queueItem.get("id"));
+ itemProperties.put("paths", queueItem.get("paths"));
+ itemProperties.put("action", queueItem.get("action"));
+ itemProperties.put("userid", queueItem.get("userid"));
+ itemProperties.put("attempts", queueItem.get("attempts"));
+ itemProperties.put("time", queueItem.get("time"));
+ itemProperties.put("state", queueItem.get("state"));
-
- Map<String, Object> itemProperties = new HashMap<String, Object>();
-
- itemProperties.put("id", queueItem.get("id"));
- itemProperties.put("paths", queueItem.get("paths"));
- itemProperties.put("action", queueItem.get("action"));
- itemProperties.put("userid", queueItem.get("userid"));
- itemProperties.put("attempts", queueItem.get("attempts"));
- itemProperties.put("time", queueItem.get("time"));
- itemProperties.put("state", queueItem.get("state"));
-
- result.add(itemProperties);
+ result.add(itemProperties);
+ }
}
return result;