You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/11/02 18:13:36 UTC
[camel] branch master updated: CAMEL-11984: AggregationStrategy -
Let EIPs inject CamelContext if CamelContextAware custom aggregation
strategy to allow access to CamelContext during start/stop logic. This
fixes CAMEL-11983: Fixed thread-safety issue.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 22935eb CAMEL-11984: AggregationStrategy - Let EIPs inject CamelContext if CamelContextAware custom aggregation strategy to allow access to CamelContext during start/stop logic. This fixes CAMEL-11983: Fixed thread-safety issue.
22935eb is described below
commit 22935eb078182b2226999aa4cf9269bf21daa007
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Nov 2 19:08:25 2017 +0100
CAMEL-11984: AggregationStrategy - Let EIPs inject CamelContext if CamelContextAware custom aggregation strategy to allow access to CamelContext during start/stop logic. This fixes CAMEL-11983: Fixed thread-safety issue.
---
.../java/org/apache/camel/processor/Enricher.java | 3 +
.../apache/camel/processor/MulticastProcessor.java | 5 +
.../org/apache/camel/processor/PollEnricher.java | 3 +
.../processor/aggregate/AggregateProcessor.java | 4 +
.../processor/aggregate/AggregationStrategy.java | 6 +-
.../ShareUnitOfWorkAggregationStrategy.java | 16 +++-
.../util/toolbox/XsltAggregationStrategy.java | 89 +++++++++++-------
.../AggregationStrategyLifecycleTest.java | 102 +++++++++++++++++++++
8 files changed, 192 insertions(+), 36 deletions(-)
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
index 1f77111..be85b4b 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
@@ -341,6 +341,9 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, IdAware,
if (aggregationStrategy == null) {
aggregationStrategy = defaultAggregationStrategy();
}
+ if (aggregationStrategy instanceof CamelContextAware) {
+ ((CamelContextAware) aggregationStrategy).setCamelContext(camelContext);
+ }
if (producerCache == null) {
if (cacheSize < 0) {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 896372b..f924561 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Endpoint;
import org.apache.camel.ErrorHandlerFactory;
@@ -1164,6 +1165,10 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
String name = getClass().getSimpleName() + "-AggregateTask";
aggregateExecutorService = createAggregateExecutorService(name);
}
+ if (aggregationStrategy instanceof CamelContextAware) {
+ ((CamelContextAware) aggregationStrategy).setCamelContext(camelContext);
+ }
+
ServiceHelper.startServices(aggregationStrategy, processors);
}
diff --git a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
index d83d964..61a00d5 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -384,6 +384,9 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAw
LOG.debug("PollEnrich {} using ConsumerCache with cacheSize={}", this, cacheSize);
}
}
+ if (aggregationStrategy instanceof CamelContextAware) {
+ ((CamelContextAware) aggregationStrategy).setCamelContext(camelContext);
+ }
ServiceHelper.startServices(consumerCache, aggregationStrategy);
}
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index fe422da..4515fa3 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -36,6 +36,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
@@ -1306,6 +1307,9 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
if (strategy instanceof DelegateAggregationStrategy) {
strategy = ((DelegateAggregationStrategy) strategy).getDelegate();
}
+ if (strategy instanceof CamelContextAware) {
+ ((CamelContextAware) strategy).setCamelContext(camelContext);
+ }
if (strategy instanceof PreCompletionAwareAggregationStrategy) {
preCompletion = true;
LOG.info("PreCompletionAwareAggregationStrategy detected. Aggregator {} is in pre-completion mode.", getId());
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java
index 145ae80..7b8c8af 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java
@@ -42,7 +42,11 @@ import org.apache.camel.Exchange;
* If an implementation also implements {@link org.apache.camel.Service} then any <a href="http://camel.apache.org/eip">EIP</a>
* that allowing configuring a {@link AggregationStrategy} will invoke the {@link org.apache.camel.Service#start()}
* and {@link org.apache.camel.Service#stop()} to control the lifecycle aligned with the EIP itself.
- *
+ * <p/>
+ * If an implementation also implements {@link org.apache.camel.CamelContextAware} then any <a href="http://camel.apache.org/eip">EIP</a>
+ * that allowing configuring a {@link AggregationStrategy} will inject the {@link org.apache.camel.CamelContext} prior
+ * to using the aggregation strategy.
+ *
* @version
*/
public interface AggregationStrategy {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java
index c7537f6..6c94730 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java
@@ -16,7 +16,11 @@
*/
package org.apache.camel.processor.aggregate;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.ServiceHelper;
import static org.apache.camel.util.ExchangeHelper.hasExceptionBeenHandledByErrorHandler;
@@ -28,7 +32,7 @@ import static org.apache.camel.util.ExchangeHelper.hasExceptionBeenHandledByErro
* <p/>
* This strategy is <b>not</b> intended for end users to use.
*/
-public final class ShareUnitOfWorkAggregationStrategy implements AggregationStrategy, DelegateAggregationStrategy {
+public final class ShareUnitOfWorkAggregationStrategy extends ServiceSupport implements AggregationStrategy, DelegateAggregationStrategy {
private final AggregationStrategy strategy;
@@ -78,4 +82,14 @@ public final class ShareUnitOfWorkAggregationStrategy implements AggregationStra
public String toString() {
return "ShareUnitOfWorkAggregationStrategy";
}
+
+ @Override
+ protected void doStart() throws Exception {
+ ServiceHelper.startService(strategy);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ ServiceHelper.stopAndShutdownServices(strategy);
+ }
}
diff --git a/camel-core/src/main/java/org/apache/camel/util/toolbox/XsltAggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/util/toolbox/XsltAggregationStrategy.java
index 057503a..792562e 100644
--- a/camel-core/src/main/java/org/apache/camel/util/toolbox/XsltAggregationStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/util/toolbox/XsltAggregationStrategy.java
@@ -17,7 +17,7 @@
package org.apache.camel.util.toolbox;
import java.io.IOException;
-
+import java.util.concurrent.RejectedExecutionException;
import javax.xml.transform.Source;
import javax.xml.transform.TransformerException;
import javax.xml.transform.TransformerFactory;
@@ -26,12 +26,14 @@ import javax.xml.transform.URIResolver;
import org.w3c.dom.Document;
import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
import org.apache.camel.builder.xml.XsltBuilder;
import org.apache.camel.builder.xml.XsltUriResolver;
import org.apache.camel.component.xslt.XsltEndpoint;
import org.apache.camel.component.xslt.XsltOutput;
import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,15 +62,15 @@ import org.slf4j.LoggerFactory;
* changed through {@link #setPropertyName(String)}.
* <p>
* Some code bits have been copied from the {@link org.apache.camel.component.xslt.XsltEndpoint}.
- *
*/
-public class XsltAggregationStrategy implements AggregationStrategy {
+public class XsltAggregationStrategy extends ServiceSupport implements AggregationStrategy, CamelContextAware {
private static final Logger LOG = LoggerFactory.getLogger(XsltAggregationStrategy.class);
private static final String DEFAULT_PROPERTY_NAME = "new-exchange";
private volatile XsltBuilder xslt;
private volatile URIResolver uriResolver;
+ private CamelContext camelContext;
private String propertyName;
private String xslFile;
@@ -85,6 +87,16 @@ public class XsltAggregationStrategy implements AggregationStrategy {
}
@Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ @Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
// guard against unlikely NPE
if (newExchange == null) {
@@ -97,17 +109,14 @@ public class XsltAggregationStrategy implements AggregationStrategy {
return newExchange;
}
- try {
- // initialize if this is the first call
- if (xslt == null) {
- initialize(oldExchange.getContext());
- }
+ if (!isRunAllowed()) {
+ throw new RejectedExecutionException();
+ }
+ try {
oldExchange.setProperty(propertyName, newExchange.getIn().getBody(Document.class));
xslt.process(oldExchange);
-
return oldExchange;
-
} catch (Throwable e) {
oldExchange.setException(e);
}
@@ -139,31 +148,9 @@ public class XsltAggregationStrategy implements AggregationStrategy {
this.propertyName = propertyName;
}
+ @Deprecated
protected void initialize(CamelContext context) throws Exception {
- // set the default property name if not set
- this.propertyName = ObjectHelper.isNotEmpty(propertyName) ? propertyName : DEFAULT_PROPERTY_NAME;
-
- // initialize the XsltBuilder
- this.xslt = context.getInjector().newInstance(XsltBuilder.class);
-
- if (transformerFactoryClass != null) {
- Class<?> factoryClass = context.getClassResolver().resolveMandatoryClass(transformerFactoryClass,
- XsltAggregationStrategy.class.getClassLoader());
- TransformerFactory factory = (TransformerFactory) context.getInjector().newInstance(factoryClass);
- xslt.getConverter().setTransformerFactory(factory);
- }
-
- if (uriResolver == null) {
- uriResolver = new XsltUriResolver(context, xslFile);
- }
-
- xslt.setUriResolver(uriResolver);
- xslt.setFailOnNullBody(true);
- xslt.transformerCacheSize(0);
- xslt.setAllowStAX(true);
-
- configureOutput(xslt, output.name());
- loadResource(xslFile);
+ this.camelContext = context;
}
protected void configureOutput(XsltBuilder xslt, String output) throws Exception {
@@ -231,4 +218,38 @@ public class XsltAggregationStrategy implements AggregationStrategy {
return this;
}
+ @Override
+ protected void doStart() throws Exception {
+ ObjectHelper.notNull(camelContext, "CamelContext", this);
+
+ // set the default property name if not set
+ this.propertyName = ObjectHelper.isNotEmpty(propertyName) ? propertyName : DEFAULT_PROPERTY_NAME;
+
+ // initialize the XsltBuilder
+ this.xslt = camelContext.getInjector().newInstance(XsltBuilder.class);
+
+ if (transformerFactoryClass != null) {
+ Class<?> factoryClass = camelContext.getClassResolver().resolveMandatoryClass(transformerFactoryClass,
+ XsltAggregationStrategy.class.getClassLoader());
+ TransformerFactory factory = (TransformerFactory) camelContext.getInjector().newInstance(factoryClass);
+ xslt.getConverter().setTransformerFactory(factory);
+ }
+
+ if (uriResolver == null) {
+ uriResolver = new XsltUriResolver(camelContext, xslFile);
+ }
+
+ xslt.setUriResolver(uriResolver);
+ xslt.setFailOnNullBody(true);
+ xslt.transformerCacheSize(0);
+ xslt.setAllowStAX(true);
+
+ configureOutput(xslt, output.name());
+ loadResource(xslFile);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ // noop
+ }
}
\ No newline at end of file
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyLifecycleTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyLifecycleTest.java
new file mode 100644
index 0000000..bd56f03
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyLifecycleTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.ObjectHelper;
+
+public class AggregationStrategyLifecycleTest extends ContextTestSupport {
+
+ private MyCompletionStrategy strategy = new MyCompletionStrategy();
+
+ public void testAggregateLifecycle() throws Exception {
+ assertTrue("Should be started", strategy.isStarted());
+ assertSame(context, strategy.getCamelContext());
+
+ MockEndpoint result = getMockEndpoint("mock:aggregated");
+ result.expectedBodiesReceived("A+B+C");
+
+ template.sendBodyAndHeader("direct:start", "A", "id", 123);
+ template.sendBodyAndHeader("direct:start", "B", "id", 123);
+ template.sendBodyAndHeader("direct:start", "C", "id", 123);
+
+ assertMockEndpointsSatisfied();
+
+ context.stop();
+
+ assertTrue("Should be stopped", strategy.isStopped());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .aggregate(header("id"), strategy).completionSize(3)
+ .to("mock:aggregated");
+ }
+ };
+ }
+
+ private final class MyCompletionStrategy extends ServiceSupport implements AggregationStrategy, CamelContextAware {
+
+ private CamelContext camelContext;
+ private String separator;
+
+ @Override
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+
+ String body = oldExchange.getIn().getBody(String.class) + separator
+ + newExchange.getIn().getBody(String.class);
+ oldExchange.getIn().setBody(body);
+ return oldExchange;
+ }
+
+ @Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ ObjectHelper.notNull(camelContext, "CamelContext");
+
+ separator = "+";
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ // noop
+ }
+ }
+}
--
To stop receiving notification emails like this one, please contact
['"commits@camel.apache.org" <co...@camel.apache.org>'].