You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by mp...@apache.org on 2015/05/07 13:23:21 UTC
svn commit: r1678168 - in
/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution:
queue/impl/ serialization/impl/
Author: mpetria
Date: Thu May 7 11:23:21 2015
New Revision: 1678168
URL: http://svn.apache.org/r1678168
Log:
Fixing concurrency problems for shared distribution packages in multiple queues
Modified:
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/MultipleQueueDispatchingStrategy.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceSharedDistributionPackage.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceSharedDistributionPackageBuilder.java
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/MultipleQueueDispatchingStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/MultipleQueueDispatchingStrategy.java?rev=1678168&r1=1678167&r2=1678168&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/MultipleQueueDispatchingStrategy.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/MultipleQueueDispatchingStrategy.java Thu May 7 11:23:21 2015
@@ -18,6 +18,7 @@
*/
package org.apache.sling.distribution.queue.impl;
+import org.apache.jackrabbit.vault.packaging.PackageManager;
import org.apache.sling.distribution.packaging.DistributionPackage;
import org.apache.sling.distribution.packaging.SharedDistributionPackage;
import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils;
@@ -33,6 +34,8 @@ import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Random;
+import java.util.UUID;
/**
* The default strategy for delivering packages to queues. Each package can be dispatched to multiple queues.
@@ -58,18 +61,26 @@ public class MultipleQueueDispatchingStr
DistributionQueueItem queueItem = getItem(distributionPackage);
List<DistributionQueueItemStatus> result = new ArrayList<DistributionQueueItemStatus>();
- for (String queueName: queueNames) {
- DistributionQueue queue = queueProvider.getQueue(queueName);
- DistributionPackageUtils.acquire(distributionPackage, queueName);
- DistributionQueueItemStatus status = new DistributionQueueItemStatus(DistributionQueueItemStatus.ItemState.ERROR, queue.getName());
- if (queue.add(queueItem)) {
- status = queue.getStatus(queueItem);
- }
- else {
- DistributionPackageUtils.releaseOrDelete(distributionPackage, queueName);
- }
+ // acquire the package temporarily until all queues are filled
+ String tempQueueName = "temp" + UUID.randomUUID();
+ DistributionPackageUtils.acquire(distributionPackage, tempQueueName);
+
+ try {
+ for (String queueName: queueNames) {
+ DistributionQueue queue = queueProvider.getQueue(queueName);
+ DistributionQueueItemStatus status = new DistributionQueueItemStatus(DistributionQueueItemStatus.ItemState.ERROR, queue.getName());
+
+ DistributionPackageUtils.acquire(distributionPackage, queueName);
+ if (queue.add(queueItem)) {
+ status = queue.getStatus(queueItem);
+ } else {
+ DistributionPackageUtils.releaseOrDelete(distributionPackage, queueName);
+ }
- result.add(status);
+ result.add(status);
+ }
+ } finally {
+ DistributionPackageUtils.releaseOrDelete(distributionPackage, tempQueueName);
}
return result;
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceSharedDistributionPackage.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceSharedDistributionPackage.java?rev=1678168&r1=1678167&r2=1678168&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceSharedDistributionPackage.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceSharedDistributionPackage.java Thu May 7 11:23:21 2015
@@ -37,6 +37,7 @@ public class ResourceSharedDistributionP
private final Logger log = LoggerFactory.getLogger(getClass());
protected static final String REFERENCE_ROOT_NODE = "refs";
+ private final Object lock;
private final ResourceResolver resourceResolver;
@@ -44,11 +45,12 @@ public class ResourceSharedDistributionP
private final DistributionPackage distributionPackage;
private final String packageName;
- public ResourceSharedDistributionPackage(ResourceResolver resourceResolver, String packageName, String packagePath, DistributionPackage distributionPackage) {
+ public ResourceSharedDistributionPackage(Object lock, ResourceResolver resourceResolver, String packageName, String packagePath, DistributionPackage distributionPackage) {
this.resourceResolver = resourceResolver;
this.packageName = packageName;
this.packagePath = packagePath;
this.distributionPackage = distributionPackage;
+ this.lock = lock;
}
public void acquire(@Nonnull String holderName) {
@@ -58,6 +60,9 @@ public class ResourceSharedDistributionP
try {
createHolderResource(holderName);
+
+ log.debug("acquired package {} for holder {}", new Object[] { packagePath, holderName } );
+
} catch (PersistenceException e) {
log.error("cannot acquire package", e);
}
@@ -72,10 +77,13 @@ public class ResourceSharedDistributionP
try {
deleteHolderResource(holderName);
- Resource holderRoot = getHolderRootResource();
- if (holderRoot != null && !holderRoot.hasChildren()) {
- delete();
+ boolean doPackageDelete = deleteIfEmpty();
+
+ if (doPackageDelete) {
+ distributionPackage.delete();
}
+
+ log.debug("released package {} from holder {} delete {}", new Object[] { packagePath, holderName, doPackageDelete } );
} catch (PersistenceException e) {
log.error("cannot release package", e);
}
@@ -103,13 +111,13 @@ public class ResourceSharedDistributionP
}
public void delete() {
- Resource resource = getProxyResource();
+
try {
- resourceResolver.delete(resource);
- resourceResolver.commit();
+ deleteHolderRoot();
} catch (PersistenceException e) {
log.error("cannot delete shared resource", e);
}
+
distributionPackage.delete();
}
@@ -132,8 +140,6 @@ public class ResourceSharedDistributionP
}
-
-
private Resource getHolderRootResource() {
Resource resource = getProxyResource();
@@ -146,38 +152,65 @@ public class ResourceSharedDistributionP
}
private void createHolderResource(String holderName) throws PersistenceException {
- Resource holderRoot = getHolderRootResource();
- if (holderRoot == null) {
- return;
- }
+ synchronized (lock) {
+ Resource holderRoot = getHolderRootResource();
- Resource holder = holderRoot.getChild(holderName);
+ if (holderRoot == null) {
+ return;
+ }
- if (holder != null) {
- return;
- }
+ Resource holder = holderRoot.getChild(holderName);
+
+ if (holder != null) {
+ return;
+ }
- resourceResolver.create(holderRoot, holderName, Collections.singletonMap(ResourceResolver.PROPERTY_RESOURCE_TYPE, (Object) "sling:Folder"));
- resourceResolver.commit();
+ resourceResolver.create(holderRoot, holderName, Collections.singletonMap(ResourceResolver.PROPERTY_RESOURCE_TYPE, (Object) "sling:Folder"));
+ resourceResolver.commit();
+ }
}
private void deleteHolderResource(String holderName) throws PersistenceException {
- Resource holderRoot = getHolderRootResource();
- if (holderRoot == null) {
- return;
+ synchronized (lock) {
+ Resource holderRoot = getHolderRootResource();
+
+ if (holderRoot == null) {
+ return;
+ }
+
+ Resource holder = holderRoot.getChild(holderName);
+
+ if (holder == null) {
+ return;
+ }
+
+ resourceResolver.delete(holder);
+ resourceResolver.commit();
}
+ }
- Resource holder = holderRoot.getChild(holderName);
+ private void deleteHolderRoot() throws PersistenceException {
+ synchronized (lock) {
+ Resource resource = getProxyResource();
+ resourceResolver.delete(resource);
+ resourceResolver.commit();
+ }
- if (holder == null) {
- return;
+ }
+
+ private boolean deleteIfEmpty() throws PersistenceException {
+ synchronized (lock) {
+ Resource holderRoot = getHolderRootResource();
+ if (holderRoot != null && !holderRoot.hasChildren()) {
+ deleteHolderRoot();
+ return true;
+ }
}
- resourceResolver.delete(holder);
- resourceResolver.commit();
+ return false;
}
}
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceSharedDistributionPackageBuilder.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceSharedDistributionPackageBuilder.java?rev=1678168&r1=1678167&r2=1678168&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceSharedDistributionPackageBuilder.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceSharedDistributionPackageBuilder.java Thu May 7 11:23:21 2015
@@ -49,6 +49,10 @@ public class ResourceSharedDistributionP
private final DistributionPackageBuilder distributionPackageBuilder;
+ // use a global repolock for syncing access to the shared package root
+ // TODO: this can be finegrained when we will allow configurable package roots
+ private final static Object repolock = new Object();
+
public ResourceSharedDistributionPackageBuilder(DistributionPackageBuilder distributionPackageExporter) {
this.distributionPackageBuilder = distributionPackageExporter;
}
@@ -69,8 +73,7 @@ public class ResourceSharedDistributionP
String packageName = generateNameFromId(resourceResolver, distributionPackage);
String packagePath = getPathFromName(packageName);
-
- return new ResourceSharedDistributionPackage(resourceResolver, packageName, packagePath, distributionPackage);
+ return new ResourceSharedDistributionPackage(repolock, resourceResolver, packageName, packagePath, distributionPackage);
}
catch (PersistenceException e) {
throw new DistributionPackageBuildingException(e);
@@ -89,7 +92,7 @@ public class ResourceSharedDistributionP
String packageName = generateNameFromId(resourceResolver, distributionPackage);
String packagePath = getPathFromName(packageName);
- return new ResourceSharedDistributionPackage(resourceResolver, packageName, packagePath, distributionPackage);
+ return new ResourceSharedDistributionPackage(repolock, resourceResolver, packageName, packagePath, distributionPackage);
}
catch (PersistenceException e) {
throw new DistributionPackageReadingException(e);
@@ -110,7 +113,7 @@ public class ResourceSharedDistributionP
return null;
}
String packagePath = getPathFromName(packageName);
- return new ResourceSharedDistributionPackage(resourceResolver, packageName, packagePath, distributionPackage);
+ return new ResourceSharedDistributionPackage(repolock, resourceResolver, packageName, packagePath, distributionPackage);
}
public boolean installPackage(@Nonnull ResourceResolver resourceResolver, @Nonnull DistributionPackage distributionPackage) throws DistributionPackageReadingException {
@@ -125,33 +128,37 @@ public class ResourceSharedDistributionP
}
- private String generateNameFromId(ResourceResolver resourceResolver, DistributionPackage distributionPackage) throws PersistenceException {
- String name = PACKAGE_NAME_PREFIX + "_" + System.currentTimeMillis() + "_" + UUID.randomUUID();
+ private String generateNameFromId(ResourceResolver resourceResolver, DistributionPackage distributionPackage) throws PersistenceException {
- Map<String, Object> properties = new HashMap<String, Object>();
- properties.put(PN_ORIGINAL_ID, distributionPackage.getId());
+ String name = PACKAGE_NAME_PREFIX + "_" + System.currentTimeMillis() + "_" + UUID.randomUUID();
- // save the info just for debugging purposes
- if (distributionPackage.getInfo().getRequestType() != null) {
- properties.put(PN_ORIGINAL_REQUEST_TYPE, distributionPackage.getInfo().getRequestType());
+ Map<String, Object> properties = new HashMap<String, Object>();
+ properties.put(PN_ORIGINAL_ID, distributionPackage.getId());
- }
- if (distributionPackage.getInfo().getPaths() != null) {
- properties.put(PN_ORIGINAL_PATHS, distributionPackage.getInfo().getPaths());
- }
+ // save the info just for debugging purposes
+ if (distributionPackage.getInfo().getRequestType() != null) {
+ properties.put(PN_ORIGINAL_REQUEST_TYPE, distributionPackage.getInfo().getRequestType());
- String packagePath = getPathFromName(name);
+ }
+ if (distributionPackage.getInfo().getPaths() != null) {
+ properties.put(PN_ORIGINAL_PATHS, distributionPackage.getInfo().getPaths());
+ }
- Resource resource = ResourceUtil.getOrCreateResource(resourceResolver, packagePath,
- "sling:Folder", "sling:Folder", false);
+ String packagePath = getPathFromName(name);
- ModifiableValueMap valueMap = resource.adaptTo(ModifiableValueMap.class);
- valueMap.putAll(properties);
+ Resource resource = ResourceUtil.getOrCreateResource(resourceResolver, packagePath,
+ "sling:Folder", "sling:Folder", false);
- resourceResolver.create(resource, ResourceSharedDistributionPackage.REFERENCE_ROOT_NODE,
- Collections.singletonMap(ResourceResolver.PROPERTY_RESOURCE_TYPE, (Object)"sling:Folder"));
+ ModifiableValueMap valueMap = resource.adaptTo(ModifiableValueMap.class);
+ valueMap.putAll(properties);
+
+ synchronized (repolock) {
+ resourceResolver.create(resource, ResourceSharedDistributionPackage.REFERENCE_ROOT_NODE,
+ Collections.singletonMap(ResourceResolver.PROPERTY_RESOURCE_TYPE, (Object)"sling:Folder"));
+
+ resourceResolver.commit();
+ }
- resourceResolver.commit();
return name;
}