You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by mp...@apache.org on 2016/01/18 16:21:08 UTC

svn commit: r1725287 - in /sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution: agent/impl/ packaging/impl/ servlet/ transport/impl/

Author: mpetria
Date: Mon Jan 18 15:21:08 2016
New Revision: 1725287

URL: http://svn.apache.org/viewvc?rev=1725287&view=rev
Log:
SLING-5436: do not move distribution packages to error queues on connection errors

Modified:
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/servlet/DistributionPackageImporterServlet.java
    sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java?rev=1725287&r1=1725286&r2=1725287&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java Mon Jan 18 15:21:08 2016
@@ -45,38 +45,40 @@ import org.apache.sling.distribution.Dis
 import org.apache.sling.distribution.DistributionResponse;
 import org.apache.sling.distribution.agent.DistributionAgent;
 import org.apache.sling.distribution.agent.DistributionAgentState;
-import org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.common.RecoverableDistributionException;
 import org.apache.sling.distribution.component.impl.DistributionComponentKind;
 import org.apache.sling.distribution.component.impl.SettingsUtils;
 import org.apache.sling.distribution.event.DistributionEventTopics;
 import org.apache.sling.distribution.event.impl.DistributionEventFactory;
 import org.apache.sling.distribution.impl.CompositeDistributionResponse;
+import org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.impl.SimpleDistributionResponse;
 import org.apache.sling.distribution.log.DistributionLog;
 import org.apache.sling.distribution.log.impl.DefaultDistributionLog;
+import org.apache.sling.distribution.packaging.DistributionPackageProcessor;
+import org.apache.sling.distribution.queue.DistributionQueueStatus;
+import org.apache.sling.distribution.queue.impl.DistributionQueueWrapper;
+import org.apache.sling.distribution.serialization.DistributionPackage;
 import org.apache.sling.distribution.packaging.DistributionPackageExporter;
 import org.apache.sling.distribution.packaging.DistributionPackageImporter;
-import org.apache.sling.distribution.packaging.DistributionPackageProcessor;
 import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils;
 import org.apache.sling.distribution.queue.DistributionQueue;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
+
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.DistributionQueueItemState;
 import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
 import org.apache.sling.distribution.queue.DistributionQueueProcessor;
 import org.apache.sling.distribution.queue.DistributionQueueProvider;
 import org.apache.sling.distribution.queue.DistributionQueueState;
-import org.apache.sling.distribution.queue.DistributionQueueStatus;
 import org.apache.sling.distribution.queue.impl.DistributionQueueDispatchingStrategy;
-import org.apache.sling.distribution.queue.impl.DistributionQueueWrapper;
-import org.apache.sling.distribution.serialization.DistributionPackage;
 import org.apache.sling.distribution.trigger.DistributionRequestHandler;
 import org.apache.sling.distribution.trigger.DistributionTrigger;
 import org.apache.sling.distribution.util.impl.DistributionUtils;
 import org.apache.sling.jcr.api.SlingRepository;
 import org.apache.sling.jcr.resource.JcrResourceConstants;
 
+
 /**
  * Basic implementation of a {@link org.apache.sling.distribution.agent.DistributionAgent}
  */
@@ -176,7 +178,7 @@ public class SimpleDistributionAgent imp
 
         ResourceResolver agentResourceResolver = null;
 
-        final String requestId = "DSTRQ" + nextRequestId.incrementAndGet();
+        final String requestId = "DSTRQ"+ nextRequestId.incrementAndGet();
         String callingUser = resourceResolver.getUserID();
 
         try {
@@ -194,7 +196,7 @@ public class SimpleDistributionAgent imp
 
             boolean silent = DistributionRequestType.PULL.equals(distributionRequest.getRequestType());
 
-            log.info(silent, "START {} {} paths={}, user={}", new Object[]{ requestId,
+            log.info(silent, "REQUEST-START {}: {} paths={}, user={}",  new Object[]{ requestId,
                     distributionRequest.getRequestType(), distributionRequest.getPaths(), callingUser});
 
             // check permissions
@@ -205,7 +207,8 @@ public class SimpleDistributionAgent imp
             // export packages
             CompositeDistributionResponse distributionResponse = exportPackages(agentResourceResolver, distributionRequest, callingUser, requestId);
 
-            log.debug("STARTED {} {} paths={}, success={}, state={}, exportTime={}ms, noPackages={}, size={}B, noQueues={}", new Object[]{
+
+            log.debug("REQUEST-STARTED {}: {} paths={}, success={}, state={}, exportTime={}ms, noPackages={}, size={}B, noQueues={}", new Object[]{
                     requestId,
                     distributionRequest.getRequestType(), distributionRequest.getPaths(),
                     distributionResponse.isSuccessful(), distributionResponse.getState(),
@@ -216,7 +219,7 @@ public class SimpleDistributionAgent imp
 
             return distributionResponse;
         } catch (DistributionException e) {
-            log.error("FAILED {} {} paths={}, user={}, message={}", new Object[]{requestId,
+            log.error("REQUEST-FAIL {}: {} paths={}, user={}, message={}", new Object[]{requestId,
                     distributionRequest.getRequestType(), distributionRequest.getPaths(), callingUser, e.getMessage()});
             throw e;
         } finally {
@@ -273,11 +276,12 @@ public class SimpleDistributionAgent imp
             distributionResponses.add(new SimpleDistributionResponse(DistributionRequestState.DROPPED, e.toString()));
         }
 
-        log.debug("SCHEDULED packageId={}, info={}", distributionPackage.getId(), distributionPackage.getInfo());
+        log.debug("PACKAGE-QUEUED {}: packageId={}, info={}", requestId, distributionPackage.getId(), distributionPackage.getInfo());
 
         return distributionResponses;
     }
 
+
     @Nonnull
     public Set<String> getQueueNames() {
         Set<String> queueNames = new TreeSet<String>();
@@ -413,7 +417,7 @@ public class SimpleDistributionAgent imp
     }
 
     private boolean processQueueItem(String queueName, DistributionQueueEntry queueEntry) throws DistributionException {
-        boolean success = false;
+        boolean removeItemFromQueue = false;
         ResourceResolver agentResourceResolver = null;
         DistributionPackage distributionPackage = null;
         DistributionQueueItem queueItem = queueEntry.getItem();
@@ -437,48 +441,46 @@ public class SimpleDistributionAgent imp
 
                 DistributionPackageUtils.mergeQueueEntry(distributionPackage.getInfo(), queueEntry);
 
-                if (processPackage(agentResourceResolver, distributionPackage)) {
-                    success = true;
-                    DistributionPackageUtils.releaseOrDelete(distributionPackage, queueName);
+                try {
+                    distributionPackageImporter.importPackage(agentResourceResolver, distributionPackage);
                     generatePackageEvent(DistributionEventTopics.AGENT_PACKAGE_DISTRIBUTED, distributionPackage);
-                } else if (errorQueueStrategy != null && queueItemStatus.getAttempts() > retryAttempts) {
-                    success = reEnqueuePackage(distributionPackage);
-                    DistributionPackageUtils.releaseOrDelete(distributionPackage, queueName);
-                }
-
-                final long endTime = System.currentTimeMillis();
+                    removeItemFromQueue = true;
+                    final long endTime = System.currentTimeMillis();
 
-                log.info("END[{}] {} {} paths={}, time={}ms, importTime={}ms, size={}B", new Object[]{
-                        queueName, requestId,
-                        requestType, paths,
-                        endTime - globalStartTime, endTime - startTime,
-                        packageSize
-                });
+                    log.info("[{}] PACKAGE-DELIVERED {}: {} paths={}, time={}ms, importTime={}ms, size={}B", new Object[] {
+                            queueName, requestId,
+                            requestType, paths,
+                            endTime - globalStartTime, endTime - startTime,
+                            packageSize
+                    });
+                } catch (RecoverableDistributionException e) {
+                    log.error("[{}] PACKAGE-FAIL {}: could not deliver {}, {}", queueName, requestId, distributionPackage.getId(), e.getMessage());
+                    log.debug("could not deliver package {}", distributionPackage.getId(), e);
+                } catch (Throwable e) {
+                    log.error("[{}] PACKAGE-FAIL {}: could not deliver package {} {}", queueName, requestId, distributionPackage.getId(), e.getMessage(), e);
+
+                    if (errorQueueStrategy != null && queueItemStatus.getAttempts() > retryAttempts) {
+                        removeItemFromQueue = reEnqueuePackage(distributionPackage);
+                        log.info("[{}] PACKAGE-QUEUED {}: distribution package {} was enqueued to an error queue", queueName, requestId, distributionPackage.getId());
+                    }
+                }
             } else {
-                success = true; // return success if package does not exist in order to clear the queue.
+                removeItemFromQueue = true; // return success if package does not exist in order to clear the queue.
                 log.error("distribution package with id {} does not exist. the package will be skipped.", queueItem.getId());
             }
         } finally {
-            ungetAgentResourceResolver(agentResourceResolver);
             DistributionPackageUtils.closeSafely(distributionPackage);
-        }
-        return success;
-    }
-
-    private boolean processPackage(ResourceResolver resourceResolver, DistributionPackage distributionPackage) {
-        try {
-            distributionPackageImporter.importPackage(resourceResolver, distributionPackage);
-        } catch (RecoverableDistributionException e) {
-            log.debug("could not deliver package {}", distributionPackage.getId(), e);
-            return false;
-        } catch (DistributionException e) {
-            log.error("could not deliver package {}", distributionPackage.getId(), e);
-            return false;
+            if (removeItemFromQueue) {
+                DistributionPackageUtils.releaseOrDelete(distributionPackage, queueName);
+            }
+            ungetAgentResourceResolver(agentResourceResolver);
         }
 
-        return true;
+        // return true if item should be removed from queue
+        return removeItemFromQueue;
     }
 
+
     private boolean reEnqueuePackage(DistributionPackage distributionPackage) {
 
         if (errorQueueStrategy == null) {
@@ -492,7 +494,6 @@ public class SimpleDistributionAgent imp
             return false;
         }
 
-        log.debug("distribution package {} was reenqueued", distributionPackage.getId());
         return true;
     }
 
@@ -530,6 +531,7 @@ public class SimpleDistributionAgent imp
                 DistributionUtils.safelyLogout(resourceResolver);
             }
         }
+
     }
 
     private void generatePackageEvent(String topic, DistributionPackage... distributionPackages) {
@@ -600,16 +602,16 @@ public class SimpleDistributionAgent imp
             DistributionQueueItem queueItem = queueEntry.getItem();
 
             try {
-                log.debug("PROCESSING[{}] item={}", queueName, queueItem);
+                log.debug("[{}] ITEM-PROCESS processing item={}", queueName, queueItem);
 
                 boolean success = processQueueItem(queueName, queueEntry);
 
-                log.debug("PROCESSED[{}] item={}, status={}", queueName, queueItem, success);
+                log.debug("[{}] ITEM-PROCESSED item={}, status={}", queueName, queueItem, success);
 
                 return success;
 
             } catch (Throwable t) {
-                log.error("FAILED[{}] item{}", queueName, queueItem, t);
+                log.error("[{}] ITEM-FAIL item={}", queueName, queueItem, t);
                 return false;
             }
         }

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java?rev=1725287&r1=1725286&r2=1725287&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java Mon Jan 18 15:21:08 2016
@@ -72,16 +72,23 @@ public class DistributionPackageUtils {
      * @param queueName the name of the queue from which it should be eventually released
      */
     public static void releaseOrDelete(DistributionPackage distributionPackage, String queueName) {
-        if (distributionPackage instanceof SharedDistributionPackage) {
-            if (queueName != null) {
-                ((SharedDistributionPackage) distributionPackage).release(queueName);
-                log.debug("package {} released from queue {}", distributionPackage.getId(), queueName);
+        if (distributionPackage == null) {
+            return;
+        }
+        try {
+            if (distributionPackage instanceof SharedDistributionPackage) {
+                if (queueName != null) {
+                    ((SharedDistributionPackage) distributionPackage).release(queueName);
+                    log.debug("package {} released from queue {}", distributionPackage.getId(), queueName);
+                } else {
+                    log.error("package {} cannot be released from null queue", distributionPackage.getId());
+                }
             } else {
-                log.error("package {} cannot be released from null queue", distributionPackage.getId());
+                deleteSafely(distributionPackage);
+                log.debug("package {} deleted", distributionPackage.getId());
             }
-        } else {
-            deleteSafely(distributionPackage);
-            log.debug("package {} deleted", distributionPackage.getId());
+        } catch (Throwable t) {
+            log.error("cannot release package {}", t);
         }
     }
 

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/servlet/DistributionPackageImporterServlet.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/servlet/DistributionPackageImporterServlet.java?rev=1725287&r1=1725286&r2=1725287&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/servlet/DistributionPackageImporterServlet.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/servlet/DistributionPackageImporterServlet.java Mon Jan 18 15:21:08 2016
@@ -56,13 +56,17 @@ public class DistributionPackageImporter
         InputStream stream = request.getInputStream();
         ResourceResolver resourceResolver = request.getResourceResolver();
         try {
+            if (request.getParameter("forceError") != null) {
+                throw new Exception("manually forced error");
+            }
+            
             DistributionPackageInfo distributionPackageInfo = distributionPackageImporter.importStream(resourceResolver, stream);
 
             log.info("Package {} imported successfully", distributionPackageInfo);
             ServletJsonUtils.writeJson(response, 200, "package imported successfully");
 
         } catch (final Throwable e) {
-            ServletJsonUtils.writeJson(response, 400, "an unexpected error has occurred during distribution import");
+            ServletJsonUtils.writeJson(response, 500, "an unexpected error has occurred during distribution import");
             log.error("Error during distribution import", e);
         } finally {
             long end = System.currentTimeMillis();

Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java?rev=1725287&r1=1725286&r2=1725287&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java Mon Jan 18 15:21:08 2016
@@ -30,6 +30,7 @@ import java.util.UUID;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.http.HttpHost;
+import org.apache.http.client.HttpResponseException;
 import org.apache.http.client.fluent.Content;
 import org.apache.http.client.fluent.Executor;
 import org.apache.http.client.fluent.Request;
@@ -104,15 +105,20 @@ public class SimpleHttpDistributionTrans
                 }
 
                 Content content = response.returnContent();
-                log.debug("DELIVERED packageId={}, endpoint={}", distributionPackage.getId(), distributionEndpoint.getUri());
+                log.debug("delivered packageId={}, endpoint={}", distributionPackage.getId(), distributionEndpoint.getUri());
             } catch (HttpHostConnectException e) {
-                log.debug("could not connect to {} - retrying", distributionEndpoint.getUri());
-                throw new RecoverableDistributionException(e);
-            } catch (Exception ex) {
-                throw new DistributionException(ex);
+                throw new RecoverableDistributionException("endpoint not available " + distributionEndpoint.getUri(), e);
+            } catch (HttpResponseException e) {
+                int statusCode = e.getStatusCode();
+                if (statusCode == 404 || statusCode == 401) {
+                    throw new RecoverableDistributionException("not enough rights for " + distributionEndpoint.getUri(), e);
+                }
+                throw new DistributionException(e);
+            } catch (Exception e) {
+                throw new DistributionException(e);
+
             }
         }
-
     }
 
     @Nullable
@@ -163,7 +169,7 @@ public class SimpleHttpDistributionTrans
             executor = executor.auth(new HttpHost(distributionEndpoint.getUri().getHost(), distributionEndpoint.getUri().getPort()),
                     credentialsMap.get(USERNAME), credentialsMap.get(PASSWORD)).authPreemptive(
                     new HttpHost(distributionEndpoint.getUri().getHost(), distributionEndpoint.getUri().getPort()));
-            log.debug("AUTH user={}, endpoint={}", secret.asCredentialsMap().get(USERNAME), distributionEndpoint.getUri());
+            log.debug("authenticate user={}, endpoint={}", secret.asCredentialsMap().get(USERNAME), distributionEndpoint.getUri());
         }
         return executor;
     }