You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by mp...@apache.org on 2016/01/18 16:21:08 UTC
svn commit: r1725287 - in
/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution:
agent/impl/ packaging/impl/ servlet/ transport/impl/
Author: mpetria
Date: Mon Jan 18 15:21:08 2016
New Revision: 1725287
URL: http://svn.apache.org/viewvc?rev=1725287&view=rev
Log:
SLING-5436: do not move distribution packages to error queues on connection errors
Modified:
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/servlet/DistributionPackageImporterServlet.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java?rev=1725287&r1=1725286&r2=1725287&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java Mon Jan 18 15:21:08 2016
@@ -45,38 +45,40 @@ import org.apache.sling.distribution.Dis
import org.apache.sling.distribution.DistributionResponse;
import org.apache.sling.distribution.agent.DistributionAgent;
import org.apache.sling.distribution.agent.DistributionAgentState;
-import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.common.RecoverableDistributionException;
import org.apache.sling.distribution.component.impl.DistributionComponentKind;
import org.apache.sling.distribution.component.impl.SettingsUtils;
import org.apache.sling.distribution.event.DistributionEventTopics;
import org.apache.sling.distribution.event.impl.DistributionEventFactory;
import org.apache.sling.distribution.impl.CompositeDistributionResponse;
+import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.impl.SimpleDistributionResponse;
import org.apache.sling.distribution.log.DistributionLog;
import org.apache.sling.distribution.log.impl.DefaultDistributionLog;
+import org.apache.sling.distribution.packaging.DistributionPackageProcessor;
+import org.apache.sling.distribution.queue.DistributionQueueStatus;
+import org.apache.sling.distribution.queue.impl.DistributionQueueWrapper;
+import org.apache.sling.distribution.serialization.DistributionPackage;
import org.apache.sling.distribution.packaging.DistributionPackageExporter;
import org.apache.sling.distribution.packaging.DistributionPackageImporter;
-import org.apache.sling.distribution.packaging.DistributionPackageProcessor;
import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils;
import org.apache.sling.distribution.queue.DistributionQueue;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
+
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.DistributionQueueItemState;
import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
import org.apache.sling.distribution.queue.DistributionQueueProcessor;
import org.apache.sling.distribution.queue.DistributionQueueProvider;
import org.apache.sling.distribution.queue.DistributionQueueState;
-import org.apache.sling.distribution.queue.DistributionQueueStatus;
import org.apache.sling.distribution.queue.impl.DistributionQueueDispatchingStrategy;
-import org.apache.sling.distribution.queue.impl.DistributionQueueWrapper;
-import org.apache.sling.distribution.serialization.DistributionPackage;
import org.apache.sling.distribution.trigger.DistributionRequestHandler;
import org.apache.sling.distribution.trigger.DistributionTrigger;
import org.apache.sling.distribution.util.impl.DistributionUtils;
import org.apache.sling.jcr.api.SlingRepository;
import org.apache.sling.jcr.resource.JcrResourceConstants;
+
/**
* Basic implementation of a {@link org.apache.sling.distribution.agent.DistributionAgent}
*/
@@ -176,7 +178,7 @@ public class SimpleDistributionAgent imp
ResourceResolver agentResourceResolver = null;
- final String requestId = "DSTRQ" + nextRequestId.incrementAndGet();
+ final String requestId = "DSTRQ"+ nextRequestId.incrementAndGet();
String callingUser = resourceResolver.getUserID();
try {
@@ -194,7 +196,7 @@ public class SimpleDistributionAgent imp
boolean silent = DistributionRequestType.PULL.equals(distributionRequest.getRequestType());
- log.info(silent, "START {} {} paths={}, user={}", new Object[]{ requestId,
+ log.info(silent, "REQUEST-START {}: {} paths={}, user={}", new Object[]{ requestId,
distributionRequest.getRequestType(), distributionRequest.getPaths(), callingUser});
// check permissions
@@ -205,7 +207,8 @@ public class SimpleDistributionAgent imp
// export packages
CompositeDistributionResponse distributionResponse = exportPackages(agentResourceResolver, distributionRequest, callingUser, requestId);
- log.debug("STARTED {} {} paths={}, success={}, state={}, exportTime={}ms, noPackages={}, size={}B, noQueues={}", new Object[]{
+
+ log.debug("REQUEST-STARTED {}: {} paths={}, success={}, state={}, exportTime={}ms, noPackages={}, size={}B, noQueues={}", new Object[]{
requestId,
distributionRequest.getRequestType(), distributionRequest.getPaths(),
distributionResponse.isSuccessful(), distributionResponse.getState(),
@@ -216,7 +219,7 @@ public class SimpleDistributionAgent imp
return distributionResponse;
} catch (DistributionException e) {
- log.error("FAILED {} {} paths={}, user={}, message={}", new Object[]{requestId,
+ log.error("REQUEST-FAIL {}: {} paths={}, user={}, message={}", new Object[]{requestId,
distributionRequest.getRequestType(), distributionRequest.getPaths(), callingUser, e.getMessage()});
throw e;
} finally {
@@ -273,11 +276,12 @@ public class SimpleDistributionAgent imp
distributionResponses.add(new SimpleDistributionResponse(DistributionRequestState.DROPPED, e.toString()));
}
- log.debug("SCHEDULED packageId={}, info={}", distributionPackage.getId(), distributionPackage.getInfo());
+ log.debug("PACKAGE-QUEUED {}: packageId={}, info={}", requestId, distributionPackage.getId(), distributionPackage.getInfo());
return distributionResponses;
}
+
@Nonnull
public Set<String> getQueueNames() {
Set<String> queueNames = new TreeSet<String>();
@@ -413,7 +417,7 @@ public class SimpleDistributionAgent imp
}
private boolean processQueueItem(String queueName, DistributionQueueEntry queueEntry) throws DistributionException {
- boolean success = false;
+ boolean removeItemFromQueue = false;
ResourceResolver agentResourceResolver = null;
DistributionPackage distributionPackage = null;
DistributionQueueItem queueItem = queueEntry.getItem();
@@ -437,48 +441,46 @@ public class SimpleDistributionAgent imp
DistributionPackageUtils.mergeQueueEntry(distributionPackage.getInfo(), queueEntry);
- if (processPackage(agentResourceResolver, distributionPackage)) {
- success = true;
- DistributionPackageUtils.releaseOrDelete(distributionPackage, queueName);
+ try {
+ distributionPackageImporter.importPackage(agentResourceResolver, distributionPackage);
generatePackageEvent(DistributionEventTopics.AGENT_PACKAGE_DISTRIBUTED, distributionPackage);
- } else if (errorQueueStrategy != null && queueItemStatus.getAttempts() > retryAttempts) {
- success = reEnqueuePackage(distributionPackage);
- DistributionPackageUtils.releaseOrDelete(distributionPackage, queueName);
- }
-
- final long endTime = System.currentTimeMillis();
+ removeItemFromQueue = true;
+ final long endTime = System.currentTimeMillis();
- log.info("END[{}] {} {} paths={}, time={}ms, importTime={}ms, size={}B", new Object[]{
- queueName, requestId,
- requestType, paths,
- endTime - globalStartTime, endTime - startTime,
- packageSize
- });
+ log.info("[{}] PACKAGE-DELIVERED {}: {} paths={}, time={}ms, importTime={}ms, size={}B", new Object[] {
+ queueName, requestId,
+ requestType, paths,
+ endTime - globalStartTime, endTime - startTime,
+ packageSize
+ });
+ } catch (RecoverableDistributionException e) {
+ log.error("[{}] PACKAGE-FAIL {}: could not deliver {}, {}", queueName, requestId, distributionPackage.getId(), e.getMessage());
+ log.debug("could not deliver package {}", distributionPackage.getId(), e);
+ } catch (Throwable e) {
+ log.error("[{}] PACKAGE-FAIL {}: could not deliver package {} {}", queueName, requestId, distributionPackage.getId(), e.getMessage(), e);
+
+ if (errorQueueStrategy != null && queueItemStatus.getAttempts() > retryAttempts) {
+ removeItemFromQueue = reEnqueuePackage(distributionPackage);
+ log.info("[{}] PACKAGE-QUEUED {}: distribution package {} was enqueued to an error queue", queueName, requestId, distributionPackage.getId());
+ }
+ }
} else {
- success = true; // return success if package does not exist in order to clear the queue.
+ removeItemFromQueue = true; // return success if package does not exist in order to clear the queue.
log.error("distribution package with id {} does not exist. the package will be skipped.", queueItem.getId());
}
} finally {
- ungetAgentResourceResolver(agentResourceResolver);
DistributionPackageUtils.closeSafely(distributionPackage);
- }
- return success;
- }
-
- private boolean processPackage(ResourceResolver resourceResolver, DistributionPackage distributionPackage) {
- try {
- distributionPackageImporter.importPackage(resourceResolver, distributionPackage);
- } catch (RecoverableDistributionException e) {
- log.debug("could not deliver package {}", distributionPackage.getId(), e);
- return false;
- } catch (DistributionException e) {
- log.error("could not deliver package {}", distributionPackage.getId(), e);
- return false;
+ if (removeItemFromQueue) {
+ DistributionPackageUtils.releaseOrDelete(distributionPackage, queueName);
+ }
+ ungetAgentResourceResolver(agentResourceResolver);
}
- return true;
+ // return true if item should be removed from queue
+ return removeItemFromQueue;
}
+
private boolean reEnqueuePackage(DistributionPackage distributionPackage) {
if (errorQueueStrategy == null) {
@@ -492,7 +494,6 @@ public class SimpleDistributionAgent imp
return false;
}
- log.debug("distribution package {} was reenqueued", distributionPackage.getId());
return true;
}
@@ -530,6 +531,7 @@ public class SimpleDistributionAgent imp
DistributionUtils.safelyLogout(resourceResolver);
}
}
+
}
private void generatePackageEvent(String topic, DistributionPackage... distributionPackages) {
@@ -600,16 +602,16 @@ public class SimpleDistributionAgent imp
DistributionQueueItem queueItem = queueEntry.getItem();
try {
- log.debug("PROCESSING[{}] item={}", queueName, queueItem);
+ log.debug("[{}] ITEM-PROCESS processing item={}", queueName, queueItem);
boolean success = processQueueItem(queueName, queueEntry);
- log.debug("PROCESSED[{}] item={}, status={}", queueName, queueItem, success);
+ log.debug("[{}] ITEM-PROCESSED item={}, status={}", queueName, queueItem, success);
return success;
} catch (Throwable t) {
- log.error("FAILED[{}] item{}", queueName, queueItem, t);
+ log.error("[{}] ITEM-FAIL item={}", queueName, queueItem, t);
return false;
}
}
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java?rev=1725287&r1=1725286&r2=1725287&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java Mon Jan 18 15:21:08 2016
@@ -72,16 +72,23 @@ public class DistributionPackageUtils {
* @param queueName the name of the queue from which it should be eventually released
*/
public static void releaseOrDelete(DistributionPackage distributionPackage, String queueName) {
- if (distributionPackage instanceof SharedDistributionPackage) {
- if (queueName != null) {
- ((SharedDistributionPackage) distributionPackage).release(queueName);
- log.debug("package {} released from queue {}", distributionPackage.getId(), queueName);
+ if (distributionPackage == null) {
+ return;
+ }
+ try {
+ if (distributionPackage instanceof SharedDistributionPackage) {
+ if (queueName != null) {
+ ((SharedDistributionPackage) distributionPackage).release(queueName);
+ log.debug("package {} released from queue {}", distributionPackage.getId(), queueName);
+ } else {
+ log.error("package {} cannot be released from null queue", distributionPackage.getId());
+ }
} else {
- log.error("package {} cannot be released from null queue", distributionPackage.getId());
+ deleteSafely(distributionPackage);
+ log.debug("package {} deleted", distributionPackage.getId());
}
- } else {
- deleteSafely(distributionPackage);
- log.debug("package {} deleted", distributionPackage.getId());
+ } catch (Throwable t) {
+ log.error("cannot release package {}", t);
}
}
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/servlet/DistributionPackageImporterServlet.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/servlet/DistributionPackageImporterServlet.java?rev=1725287&r1=1725286&r2=1725287&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/servlet/DistributionPackageImporterServlet.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/servlet/DistributionPackageImporterServlet.java Mon Jan 18 15:21:08 2016
@@ -56,13 +56,17 @@ public class DistributionPackageImporter
InputStream stream = request.getInputStream();
ResourceResolver resourceResolver = request.getResourceResolver();
try {
+ if (request.getParameter("forceError") != null) {
+ throw new Exception("manually forced error");
+ }
+
DistributionPackageInfo distributionPackageInfo = distributionPackageImporter.importStream(resourceResolver, stream);
log.info("Package {} imported successfully", distributionPackageInfo);
ServletJsonUtils.writeJson(response, 200, "package imported successfully");
} catch (final Throwable e) {
- ServletJsonUtils.writeJson(response, 400, "an unexpected error has occurred during distribution import");
+ ServletJsonUtils.writeJson(response, 500, "an unexpected error has occurred during distribution import");
log.error("Error during distribution import", e);
} finally {
long end = System.currentTimeMillis();
Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java?rev=1725287&r1=1725286&r2=1725287&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java (original)
+++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java Mon Jan 18 15:21:08 2016
@@ -30,6 +30,7 @@ import java.util.UUID;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpHost;
+import org.apache.http.client.HttpResponseException;
import org.apache.http.client.fluent.Content;
import org.apache.http.client.fluent.Executor;
import org.apache.http.client.fluent.Request;
@@ -104,15 +105,20 @@ public class SimpleHttpDistributionTrans
}
Content content = response.returnContent();
- log.debug("DELIVERED packageId={}, endpoint={}", distributionPackage.getId(), distributionEndpoint.getUri());
+ log.debug("delivered packageId={}, endpoint={}", distributionPackage.getId(), distributionEndpoint.getUri());
} catch (HttpHostConnectException e) {
- log.debug("could not connect to {} - retrying", distributionEndpoint.getUri());
- throw new RecoverableDistributionException(e);
- } catch (Exception ex) {
- throw new DistributionException(ex);
+ throw new RecoverableDistributionException("endpoint not available " + distributionEndpoint.getUri(), e);
+ } catch (HttpResponseException e) {
+ int statusCode = e.getStatusCode();
+ if (statusCode == 404 || statusCode == 401) {
+ throw new RecoverableDistributionException("not enough rights for " + distributionEndpoint.getUri(), e);
+ }
+ throw new DistributionException(e);
+ } catch (Exception e) {
+ throw new DistributionException(e);
+
}
}
-
}
@Nullable
@@ -163,7 +169,7 @@ public class SimpleHttpDistributionTrans
executor = executor.auth(new HttpHost(distributionEndpoint.getUri().getHost(), distributionEndpoint.getUri().getPort()),
credentialsMap.get(USERNAME), credentialsMap.get(PASSWORD)).authPreemptive(
new HttpHost(distributionEndpoint.getUri().getHost(), distributionEndpoint.getUri().getPort()));
- log.debug("AUTH user={}, endpoint={}", secret.asCredentialsMap().get(USERNAME), distributionEndpoint.getUri());
+ log.debug("authenticate user={}, endpoint={}", secret.asCredentialsMap().get(USERNAME), distributionEndpoint.getUri());
}
return executor;
}