You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/11/02 18:13:36 UTC

[camel] branch master updated: CAMEL-11984: AggregationStrategy - Let EIPs inject CamelContext if CamelContextAware custom aggregation strategy to allow access to CamelContext during start/stop logic. This fixes CAMEL-11983: Fixed thread-safety issue.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 22935eb  CAMEL-11984: AggregationStrategy - Let EIPs inject CamelContext if CamelContextAware custom aggregation strategy to allow access to CamelContext during start/stop logic. This fixes CAMEL-11983: Fixed thread-safety issue.
22935eb is described below

commit 22935eb078182b2226999aa4cf9269bf21daa007
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Nov 2 19:08:25 2017 +0100

    CAMEL-11984: AggregationStrategy - Let EIPs inject CamelContext if CamelContextAware custom aggregation strategy to allow access to CamelContext during start/stop logic. This fixes CAMEL-11983: Fixed thread-safety issue.
---
 .../java/org/apache/camel/processor/Enricher.java  |   3 +
 .../apache/camel/processor/MulticastProcessor.java |   5 +
 .../org/apache/camel/processor/PollEnricher.java   |   3 +
 .../processor/aggregate/AggregateProcessor.java    |   4 +
 .../processor/aggregate/AggregationStrategy.java   |   6 +-
 .../ShareUnitOfWorkAggregationStrategy.java        |  16 +++-
 .../util/toolbox/XsltAggregationStrategy.java      |  89 +++++++++++-------
 .../AggregationStrategyLifecycleTest.java          | 102 +++++++++++++++++++++
 8 files changed, 192 insertions(+), 36 deletions(-)

diff --git a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
index 1f77111..be85b4b 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
@@ -341,6 +341,9 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, IdAware,
         if (aggregationStrategy == null) {
             aggregationStrategy = defaultAggregationStrategy();
         }
+        if (aggregationStrategy instanceof CamelContextAware) {
+            ((CamelContextAware) aggregationStrategy).setCamelContext(camelContext);
+        }
 
         if (producerCache == null) {
             if (cacheSize < 0) {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 896372b..f924561 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Endpoint;
 import org.apache.camel.ErrorHandlerFactory;
@@ -1164,6 +1165,10 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
             String name = getClass().getSimpleName() + "-AggregateTask";
             aggregateExecutorService = createAggregateExecutorService(name);
         }
+        if (aggregationStrategy instanceof CamelContextAware) {
+            ((CamelContextAware) aggregationStrategy).setCamelContext(camelContext);
+        }
+
         ServiceHelper.startServices(aggregationStrategy, processors);
     }
 
diff --git a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
index d83d964..61a00d5 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -384,6 +384,9 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAw
                 LOG.debug("PollEnrich {} using ConsumerCache with cacheSize={}", this, cacheSize);
             }
         }
+        if (aggregationStrategy instanceof CamelContextAware) {
+            ((CamelContextAware) aggregationStrategy).setCamelContext(camelContext);
+        }
         ServiceHelper.startServices(consumerCache, aggregationStrategy);
     }
 
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index fe422da..4515fa3 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -36,6 +36,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -1306,6 +1307,9 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
         if (strategy instanceof DelegateAggregationStrategy) {
             strategy = ((DelegateAggregationStrategy) strategy).getDelegate();
         }
+        if (strategy instanceof CamelContextAware) {
+            ((CamelContextAware) strategy).setCamelContext(camelContext);
+        }
         if (strategy instanceof PreCompletionAwareAggregationStrategy) {
             preCompletion = true;
             LOG.info("PreCompletionAwareAggregationStrategy detected. Aggregator {} is in pre-completion mode.", getId());
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java
index 145ae80..7b8c8af 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java
@@ -42,7 +42,11 @@ import org.apache.camel.Exchange;
  * If an implementation also implements {@link org.apache.camel.Service} then any <a href="http://camel.apache.org/eip">EIP</a>
  * that allowing configuring a {@link AggregationStrategy} will invoke the {@link org.apache.camel.Service#start()}
  * and {@link org.apache.camel.Service#stop()} to control the lifecycle aligned with the EIP itself.
- * 
+ * <p/>
+ * If an implementation also implements {@link org.apache.camel.CamelContextAware} then any <a href="http://camel.apache.org/eip">EIP</a>
+ * that allowing configuring a {@link AggregationStrategy} will inject the {@link org.apache.camel.CamelContext} prior
+ * to using the aggregation strategy.
+ *
  * @version 
  */
 public interface AggregationStrategy {
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java
index c7537f6..6c94730 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java
@@ -16,7 +16,11 @@
  */
 package org.apache.camel.processor.aggregate;
 
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.ServiceHelper;
 
 import static org.apache.camel.util.ExchangeHelper.hasExceptionBeenHandledByErrorHandler;
 
@@ -28,7 +32,7 @@ import static org.apache.camel.util.ExchangeHelper.hasExceptionBeenHandledByErro
  * <p/>
  * This strategy is <b>not</b> intended for end users to use.
  */
-public final class ShareUnitOfWorkAggregationStrategy implements AggregationStrategy, DelegateAggregationStrategy {
+public final class ShareUnitOfWorkAggregationStrategy extends ServiceSupport implements AggregationStrategy, DelegateAggregationStrategy {
 
     private final AggregationStrategy strategy;
 
@@ -78,4 +82,14 @@ public final class ShareUnitOfWorkAggregationStrategy implements AggregationStra
     public String toString() {
         return "ShareUnitOfWorkAggregationStrategy";
     }
+
+    @Override
+    protected void doStart() throws Exception {
+        ServiceHelper.startService(strategy);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        ServiceHelper.stopAndShutdownServices(strategy);
+    }
 }
diff --git a/camel-core/src/main/java/org/apache/camel/util/toolbox/XsltAggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/util/toolbox/XsltAggregationStrategy.java
index 057503a..792562e 100644
--- a/camel-core/src/main/java/org/apache/camel/util/toolbox/XsltAggregationStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/util/toolbox/XsltAggregationStrategy.java
@@ -17,7 +17,7 @@
 package org.apache.camel.util.toolbox;
 
 import java.io.IOException;
-
+import java.util.concurrent.RejectedExecutionException;
 import javax.xml.transform.Source;
 import javax.xml.transform.TransformerException;
 import javax.xml.transform.TransformerFactory;
@@ -26,12 +26,14 @@ import javax.xml.transform.URIResolver;
 import org.w3c.dom.Document;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.xml.XsltBuilder;
 import org.apache.camel.builder.xml.XsltUriResolver;
 import org.apache.camel.component.xslt.XsltEndpoint;
 import org.apache.camel.component.xslt.XsltOutput;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,15 +62,15 @@ import org.slf4j.LoggerFactory;
  * changed through {@link #setPropertyName(String)}.
  * <p>
  * Some code bits have been copied from the {@link org.apache.camel.component.xslt.XsltEndpoint}.
- *
  */
-public class XsltAggregationStrategy implements AggregationStrategy {
+public class XsltAggregationStrategy extends ServiceSupport implements AggregationStrategy, CamelContextAware {
 
     private static final Logger LOG = LoggerFactory.getLogger(XsltAggregationStrategy.class);
     private static final String DEFAULT_PROPERTY_NAME = "new-exchange";
 
     private volatile XsltBuilder xslt;
     private volatile URIResolver uriResolver;
+    private CamelContext camelContext;
 
     private String propertyName;
     private String xslFile;
@@ -85,6 +87,16 @@ public class XsltAggregationStrategy implements AggregationStrategy {
     }
 
     @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
         // guard against unlikely NPE
         if (newExchange == null) {
@@ -97,17 +109,14 @@ public class XsltAggregationStrategy implements AggregationStrategy {
             return newExchange;
         }
 
-        try {
-            // initialize if this is the first call
-            if (xslt == null) {
-                initialize(oldExchange.getContext());
-            }
+        if (!isRunAllowed()) {
+            throw new RejectedExecutionException();
+        }
 
+        try {
             oldExchange.setProperty(propertyName, newExchange.getIn().getBody(Document.class));
             xslt.process(oldExchange);
-
             return oldExchange;
-
         } catch (Throwable e) {
             oldExchange.setException(e);
         }
@@ -139,31 +148,9 @@ public class XsltAggregationStrategy implements AggregationStrategy {
         this.propertyName = propertyName;
     }
 
+    @Deprecated
     protected void initialize(CamelContext context) throws Exception {
-        // set the default property name if not set
-        this.propertyName = ObjectHelper.isNotEmpty(propertyName) ? propertyName : DEFAULT_PROPERTY_NAME;
-
-        // initialize the XsltBuilder
-        this.xslt = context.getInjector().newInstance(XsltBuilder.class);
-
-        if (transformerFactoryClass != null) {
-            Class<?> factoryClass = context.getClassResolver().resolveMandatoryClass(transformerFactoryClass,
-                    XsltAggregationStrategy.class.getClassLoader());
-            TransformerFactory factory = (TransformerFactory) context.getInjector().newInstance(factoryClass);
-            xslt.getConverter().setTransformerFactory(factory);
-        }
-
-        if (uriResolver == null) {
-            uriResolver = new XsltUriResolver(context, xslFile);
-        }
-
-        xslt.setUriResolver(uriResolver);
-        xslt.setFailOnNullBody(true);
-        xslt.transformerCacheSize(0);
-        xslt.setAllowStAX(true);
-
-        configureOutput(xslt, output.name());
-        loadResource(xslFile);
+        this.camelContext = context;
     }
 
     protected void configureOutput(XsltBuilder xslt, String output) throws Exception {
@@ -231,4 +218,38 @@ public class XsltAggregationStrategy implements AggregationStrategy {
         return this;
     }
 
+    @Override
+    protected void doStart() throws Exception {
+        ObjectHelper.notNull(camelContext, "CamelContext", this);
+
+        // set the default property name if not set
+        this.propertyName = ObjectHelper.isNotEmpty(propertyName) ? propertyName : DEFAULT_PROPERTY_NAME;
+
+        // initialize the XsltBuilder
+        this.xslt = camelContext.getInjector().newInstance(XsltBuilder.class);
+
+        if (transformerFactoryClass != null) {
+            Class<?> factoryClass = camelContext.getClassResolver().resolveMandatoryClass(transformerFactoryClass,
+                XsltAggregationStrategy.class.getClassLoader());
+            TransformerFactory factory = (TransformerFactory) camelContext.getInjector().newInstance(factoryClass);
+            xslt.getConverter().setTransformerFactory(factory);
+        }
+
+        if (uriResolver == null) {
+            uriResolver = new XsltUriResolver(camelContext, xslFile);
+        }
+
+        xslt.setUriResolver(uriResolver);
+        xslt.setFailOnNullBody(true);
+        xslt.transformerCacheSize(0);
+        xslt.setAllowStAX(true);
+
+        configureOutput(xslt, output.name());
+        loadResource(xslFile);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        // noop
+    }
 }
\ No newline at end of file
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyLifecycleTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyLifecycleTest.java
new file mode 100644
index 0000000..bd56f03
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregationStrategyLifecycleTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.ObjectHelper;
+
+public class AggregationStrategyLifecycleTest extends ContextTestSupport {
+
+    private MyCompletionStrategy strategy = new MyCompletionStrategy();
+
+    public void testAggregateLifecycle() throws Exception {
+        assertTrue("Should be started", strategy.isStarted());
+        assertSame(context, strategy.getCamelContext());
+
+        MockEndpoint result = getMockEndpoint("mock:aggregated");
+        result.expectedBodiesReceived("A+B+C");
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+
+        assertMockEndpointsSatisfied();
+
+        context.stop();
+
+        assertTrue("Should be stopped", strategy.isStopped());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .aggregate(header("id"), strategy).completionSize(3)
+                    .to("mock:aggregated");
+            }
+        };
+    }
+
+    private final class MyCompletionStrategy extends ServiceSupport implements AggregationStrategy, CamelContextAware {
+
+        private CamelContext camelContext;
+        private String separator;
+
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+
+            String body = oldExchange.getIn().getBody(String.class) + separator
+                + newExchange.getIn().getBody(String.class);
+            oldExchange.getIn().setBody(body);
+            return oldExchange;
+        }
+
+        @Override
+        public CamelContext getCamelContext() {
+            return camelContext;
+        }
+
+        @Override
+        public void setCamelContext(CamelContext camelContext) {
+            this.camelContext = camelContext;
+        }
+
+        @Override
+        protected void doStart() throws Exception {
+            ObjectHelper.notNull(camelContext, "CamelContext");
+
+            separator = "+";
+        }
+
+        @Override
+        protected void doStop() throws Exception {
+            // noop
+        }
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
['"commits@camel.apache.org" <co...@camel.apache.org>'].