You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by mp...@apache.org on 2015/05/07 13:23:21 UTC

svn commit: r1678168 - in /sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution: queue/impl/ serialization/impl/

Author: mpetria
Date: Thu May  7 11:23:21 2015
New Revision: 1678168

URL: http://svn.apache.org/r1678168
Log:
Fixing concurrency problems for shared distribution packages in multiple queues

Modified:
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/MultipleQueueDispatchingStrategy.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceSharedDistributionPackage.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceSharedDistributionPackageBuilder.java

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/MultipleQueueDispatchingStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/MultipleQueueDispatchingStrategy.java?rev=1678168&r1=1678167&r2=1678168&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/MultipleQueueDispatchingStrategy.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/MultipleQueueDispatchingStrategy.java Thu May  7 11:23:21 2015
@@ -18,6 +18,7 @@
  */
 package org.apache.sling.distribution.queue.impl;
 
+import org.apache.jackrabbit.vault.packaging.PackageManager;
 import org.apache.sling.distribution.packaging.DistributionPackage;
 import org.apache.sling.distribution.packaging.SharedDistributionPackage;
 import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils;
@@ -33,6 +34,8 @@ import javax.annotation.Nonnull;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Random;
+import java.util.UUID;
 
 /**
  * The default strategy for delivering packages to queues. Each package can be dispatched to multiple queues.
@@ -58,18 +61,26 @@ public class MultipleQueueDispatchingStr
         DistributionQueueItem queueItem = getItem(distributionPackage);
         List<DistributionQueueItemStatus> result = new ArrayList<DistributionQueueItemStatus>();
 
-        for (String queueName: queueNames) {
-            DistributionQueue queue = queueProvider.getQueue(queueName);
-            DistributionPackageUtils.acquire(distributionPackage, queueName);
-            DistributionQueueItemStatus status = new DistributionQueueItemStatus(DistributionQueueItemStatus.ItemState.ERROR, queue.getName());
-            if (queue.add(queueItem)) {
-                 status = queue.getStatus(queueItem);
-            }
-            else {
-                DistributionPackageUtils.releaseOrDelete(distributionPackage, queueName);
-            }
+        // acquire the package temporarily until all queues are filled
+        String tempQueueName = "temp" + UUID.randomUUID();
+        DistributionPackageUtils.acquire(distributionPackage, tempQueueName);
+
+        try {
+            for (String queueName: queueNames) {
+                DistributionQueue queue = queueProvider.getQueue(queueName);
+                DistributionQueueItemStatus status = new DistributionQueueItemStatus(DistributionQueueItemStatus.ItemState.ERROR, queue.getName());
+
+                DistributionPackageUtils.acquire(distributionPackage, queueName);
+                if (queue.add(queueItem)) {
+                    status = queue.getStatus(queueItem);
+                } else {
+                    DistributionPackageUtils.releaseOrDelete(distributionPackage, queueName);
+                }
 
-            result.add(status);
+                result.add(status);
+            }
+        } finally {
+            DistributionPackageUtils.releaseOrDelete(distributionPackage, tempQueueName);
         }
 
         return result;

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceSharedDistributionPackage.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceSharedDistributionPackage.java?rev=1678168&r1=1678167&r2=1678168&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceSharedDistributionPackage.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceSharedDistributionPackage.java Thu May  7 11:23:21 2015
@@ -37,6 +37,7 @@ public class ResourceSharedDistributionP
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     protected static final String  REFERENCE_ROOT_NODE = "refs";
+    private final Object lock;
 
 
     private final ResourceResolver resourceResolver;
@@ -44,11 +45,12 @@ public class ResourceSharedDistributionP
     private final DistributionPackage distributionPackage;
     private final String packageName;
 
-    public ResourceSharedDistributionPackage(ResourceResolver resourceResolver, String packageName, String packagePath, DistributionPackage distributionPackage) {
+    public ResourceSharedDistributionPackage(Object lock, ResourceResolver resourceResolver, String packageName, String packagePath, DistributionPackage distributionPackage) {
         this.resourceResolver = resourceResolver;
         this.packageName = packageName;
         this.packagePath = packagePath;
         this.distributionPackage = distributionPackage;
+        this.lock = lock;
     }
 
     public void acquire(@Nonnull String holderName) {
@@ -58,6 +60,9 @@ public class ResourceSharedDistributionP
         
         try {
             createHolderResource(holderName);
+
+            log.debug("acquired package {} for holder {}", new Object[] { packagePath, holderName } );
+
         } catch (PersistenceException e) {
             log.error("cannot acquire package", e);
         }
@@ -72,10 +77,13 @@ public class ResourceSharedDistributionP
         try {
             deleteHolderResource(holderName);
 
-            Resource holderRoot = getHolderRootResource();
-            if (holderRoot != null && !holderRoot.hasChildren()) {
-                delete();
+            boolean doPackageDelete = deleteIfEmpty();
+
+            if (doPackageDelete) {
+                distributionPackage.delete();
             }
+
+            log.debug("released package {} from holder {} delete {}", new Object[] { packagePath, holderName, doPackageDelete } );
         } catch (PersistenceException e) {
             log.error("cannot release package", e);
         }
@@ -103,13 +111,13 @@ public class ResourceSharedDistributionP
     }
 
     public void delete() {
-        Resource resource = getProxyResource();
+
         try {
-            resourceResolver.delete(resource);
-            resourceResolver.commit();
+            deleteHolderRoot();
         } catch (PersistenceException e) {
             log.error("cannot delete shared resource", e);
         }
+
         distributionPackage.delete();
     }
 
@@ -132,8 +140,6 @@ public class ResourceSharedDistributionP
     }
 
 
-
-
     private Resource getHolderRootResource()  {
         Resource resource = getProxyResource();
 
@@ -146,38 +152,65 @@ public class ResourceSharedDistributionP
     }
 
     private void createHolderResource(String holderName) throws PersistenceException {
-        Resource holderRoot = getHolderRootResource();
 
-        if (holderRoot == null) {
-            return;
-        }
+        synchronized (lock) {
+            Resource holderRoot = getHolderRootResource();
 
-        Resource holder = holderRoot.getChild(holderName);
+            if (holderRoot == null) {
+                return;
+            }
 
-        if (holder != null) {
-            return;
-        }
+            Resource holder = holderRoot.getChild(holderName);
+
+            if (holder != null) {
+                return;
+            }
 
-        resourceResolver.create(holderRoot, holderName, Collections.singletonMap(ResourceResolver.PROPERTY_RESOURCE_TYPE, (Object) "sling:Folder"));
-        resourceResolver.commit();
+            resourceResolver.create(holderRoot, holderName, Collections.singletonMap(ResourceResolver.PROPERTY_RESOURCE_TYPE, (Object) "sling:Folder"));
+            resourceResolver.commit();
 
+        }
     }
 
     private void deleteHolderResource(String holderName) throws PersistenceException {
-        Resource holderRoot = getHolderRootResource();
 
-        if (holderRoot == null) {
-            return;
+        synchronized (lock) {
+            Resource holderRoot = getHolderRootResource();
+
+            if (holderRoot == null) {
+                return;
+            }
+
+            Resource holder = holderRoot.getChild(holderName);
+
+            if (holder == null) {
+                return;
+            }
+
+            resourceResolver.delete(holder);
+            resourceResolver.commit();
         }
+    }
 
-        Resource holder = holderRoot.getChild(holderName);
+    private void deleteHolderRoot() throws PersistenceException {
+        synchronized (lock) {
+            Resource resource = getProxyResource();
+            resourceResolver.delete(resource);
+            resourceResolver.commit();
+        }
 
-        if (holder == null) {
-            return;
+    }
+
+    private boolean deleteIfEmpty() throws PersistenceException {
+        synchronized (lock) {
+            Resource holderRoot = getHolderRootResource();
+            if (holderRoot != null && !holderRoot.hasChildren()) {
+                deleteHolderRoot();
+                return true;
+            }
         }
 
-        resourceResolver.delete(holder);
-        resourceResolver.commit();
+        return false;
     }
 
 }

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceSharedDistributionPackageBuilder.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceSharedDistributionPackageBuilder.java?rev=1678168&r1=1678167&r2=1678168&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceSharedDistributionPackageBuilder.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/ResourceSharedDistributionPackageBuilder.java Thu May  7 11:23:21 2015
@@ -49,6 +49,10 @@ public class ResourceSharedDistributionP
 
     private final DistributionPackageBuilder distributionPackageBuilder;
 
+    // use a global repolock for syncing access to the shared package root
+    // TODO: this can be finegrained when we will allow configurable package roots
+    private final static Object repolock = new Object();
+
     public ResourceSharedDistributionPackageBuilder(DistributionPackageBuilder distributionPackageExporter) {
         this.distributionPackageBuilder = distributionPackageExporter;
     }
@@ -69,8 +73,7 @@ public class ResourceSharedDistributionP
             String packageName = generateNameFromId(resourceResolver, distributionPackage);
             String packagePath = getPathFromName(packageName);
 
-
-            return new ResourceSharedDistributionPackage(resourceResolver, packageName, packagePath, distributionPackage);
+            return new ResourceSharedDistributionPackage(repolock, resourceResolver, packageName, packagePath, distributionPackage);
         }
         catch (PersistenceException e) {
             throw new DistributionPackageBuildingException(e);
@@ -89,7 +92,7 @@ public class ResourceSharedDistributionP
             String packageName = generateNameFromId(resourceResolver, distributionPackage);
             String packagePath = getPathFromName(packageName);
 
-            return new ResourceSharedDistributionPackage(resourceResolver, packageName, packagePath, distributionPackage);
+            return new ResourceSharedDistributionPackage(repolock, resourceResolver, packageName, packagePath, distributionPackage);
         }
         catch (PersistenceException e) {
             throw new DistributionPackageReadingException(e);
@@ -110,7 +113,7 @@ public class ResourceSharedDistributionP
             return null;
         }
         String packagePath = getPathFromName(packageName);
-        return new ResourceSharedDistributionPackage(resourceResolver, packageName, packagePath, distributionPackage);
+        return new ResourceSharedDistributionPackage(repolock, resourceResolver, packageName, packagePath, distributionPackage);
     }
 
     public boolean installPackage(@Nonnull ResourceResolver resourceResolver, @Nonnull DistributionPackage distributionPackage) throws DistributionPackageReadingException {
@@ -125,33 +128,37 @@ public class ResourceSharedDistributionP
     }
 
 
-    private String generateNameFromId(ResourceResolver resourceResolver, DistributionPackage distributionPackage) throws PersistenceException {
-        String name = PACKAGE_NAME_PREFIX + "_" + System.currentTimeMillis() + "_" +  UUID.randomUUID();
+    private  String generateNameFromId(ResourceResolver resourceResolver, DistributionPackage distributionPackage) throws PersistenceException {
 
-        Map<String, Object> properties = new HashMap<String, Object>();
-        properties.put(PN_ORIGINAL_ID, distributionPackage.getId());
+            String name = PACKAGE_NAME_PREFIX + "_" + System.currentTimeMillis() + "_" +  UUID.randomUUID();
 
-        // save the info just for debugging purposes
-        if (distributionPackage.getInfo().getRequestType() != null) {
-            properties.put(PN_ORIGINAL_REQUEST_TYPE, distributionPackage.getInfo().getRequestType());
+            Map<String, Object> properties = new HashMap<String, Object>();
+            properties.put(PN_ORIGINAL_ID, distributionPackage.getId());
 
-        }
-        if (distributionPackage.getInfo().getPaths() != null) {
-            properties.put(PN_ORIGINAL_PATHS, distributionPackage.getInfo().getPaths());
-        }
+            // save the info just for debugging purposes
+            if (distributionPackage.getInfo().getRequestType() != null) {
+                properties.put(PN_ORIGINAL_REQUEST_TYPE, distributionPackage.getInfo().getRequestType());
 
-        String packagePath = getPathFromName(name);
+            }
+            if (distributionPackage.getInfo().getPaths() != null) {
+                properties.put(PN_ORIGINAL_PATHS, distributionPackage.getInfo().getPaths());
+            }
 
-        Resource resource = ResourceUtil.getOrCreateResource(resourceResolver, packagePath,
-                "sling:Folder", "sling:Folder", false);
+            String packagePath = getPathFromName(name);
 
-        ModifiableValueMap valueMap = resource.adaptTo(ModifiableValueMap.class);
-        valueMap.putAll(properties);
+            Resource resource = ResourceUtil.getOrCreateResource(resourceResolver, packagePath,
+                    "sling:Folder", "sling:Folder", false);
 
-        resourceResolver.create(resource, ResourceSharedDistributionPackage.REFERENCE_ROOT_NODE,
-                Collections.singletonMap(ResourceResolver.PROPERTY_RESOURCE_TYPE, (Object)"sling:Folder"));
+            ModifiableValueMap valueMap = resource.adaptTo(ModifiableValueMap.class);
+            valueMap.putAll(properties);
+
+        synchronized (repolock) {
+            resourceResolver.create(resource, ResourceSharedDistributionPackage.REFERENCE_ROOT_NODE,
+                    Collections.singletonMap(ResourceResolver.PROPERTY_RESOURCE_TYPE, (Object)"sling:Folder"));
+
+            resourceResolver.commit();
+        }
 
-        resourceResolver.commit();
         return name;
     }