You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/12/13 09:15:52 UTC

[camel] branch camel-2.20.x updated (51de426 -> a6d5e1b)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch camel-2.20.x
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 51de426  CAMEL-12069: ActiveMQ/JMS component: transferExchange option does not transfer exchange properties anymore
     new 5f49d5a  CAMEL-12056: Introduce NotifyBuilder.destroy()
     new 7115cee  Java 7'isms for NotifyBuilder
     new 1c708d0  CAMEL-12075: aggregator thread does not finish in iterating splitter
     new a6d5e1b  CAMEL-12075: Polished. This closes #2142

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/camel/builder/NotifyBuilder.java    |  40 ++++--
 .../apache/camel/processor/MulticastProcessor.java | 106 ++++++++-------
 .../apache/camel/builder/NotifyBuilderTest.java    |  39 ++++++
 ...terParallelRuntimeExceptionInHasNextOrNext.java | 150 +++++++++++++++++++++
 4 files changed, 276 insertions(+), 59 deletions(-)
 create mode 100644 camel-core/src/test/java/org/apache/camel/issues/SplitterParallelRuntimeExceptionInHasNextOrNext.java

-- 
To stop receiving notification emails like this one, please contact
['"commits@camel.apache.org" <co...@camel.apache.org>'].

[camel] 03/04: CAMEL-12075: aggregator thread does not finish in iterating splitter

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.20.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 1c708d0c6340d51c65d6d1295311d3b39761e7a4
Author: Franz Forsthofer <fr...@sap.com>
AuthorDate: Mon Dec 11 09:06:59 2017 +0100

    CAMEL-12075: aggregator thread does not finish in iterating splitter
---
 .../apache/camel/processor/MulticastProcessor.java | 102 +++++++-------
 ...terParallelRuntimeExceptionInHasNextOrNext.java | 155 +++++++++++++++++++++
 2 files changed, 209 insertions(+), 48 deletions(-)

diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index f924561..90a6f7b 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -301,56 +301,62 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
             final AtomicBoolean aggregationTaskSubmitted = new AtomicBoolean();
 
             LOG.trace("Starting to submit parallel tasks");
-
-            while (it.hasNext()) {
-                final ProcessorExchangePair pair = it.next();
-                // in case the iterator returns null then continue to next
-                if (pair == null) {
-                    continue;
-                }
-
-                final Exchange subExchange = pair.getExchange();
-                updateNewExchange(subExchange, total.intValue(), pairs, it);
-
-                completion.submit(new Callable<Exchange>() {
-                    public Exchange call() throws Exception {
-                        // start the aggregation task at this stage only in order not to pile up too many threads
-                        if (aggregationTaskSubmitted.compareAndSet(false, true)) {
-                            // but only submit the aggregation task once
-                            aggregateExecutorService.submit(aggregateOnTheFlyTask);
-                        }
-
-                        if (!running.get()) {
-                            // do not start processing the task if we are not running
-                            return subExchange;
-                        }
-
-                        try {
-                            doProcessParallel(pair);
-                        } catch (Throwable e) {
-                            subExchange.setException(e);
-                        }
-
-                        // Decide whether to continue with the multicast or not; similar logic to the Pipeline
-                        Integer number = getExchangeIndex(subExchange);
-                        boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + number, LOG);
-                        if (stopOnException && !continueProcessing) {
-                            // signal to stop running
-                            running.set(false);
-                            // throw caused exception
-                            if (subExchange.getException() != null) {
-                                // wrap in exception to explain where it failed
-                                CamelExchangeException cause = new CamelExchangeException("Parallel processing failed for number " + number, subExchange, subExchange.getException());
-                                subExchange.setException(cause);
+            
+            try {
+                while (it.hasNext()) {
+                    final ProcessorExchangePair pair = it.next();
+                    // in case the iterator returns null then continue to next
+                    if (pair == null) {
+                        continue;
+                    }
+    
+                    final Exchange subExchange = pair.getExchange();
+                    updateNewExchange(subExchange, total.intValue(), pairs, it);
+    
+                    completion.submit(new Callable<Exchange>() {
+                        public Exchange call() throws Exception {
+                            // start the aggregation task at this stage only in order not to pile up too many threads
+                            if (aggregationTaskSubmitted.compareAndSet(false, true)) {
+                                // but only submit the aggregation task once
+                                aggregateExecutorService.submit(aggregateOnTheFlyTask);
+                            }
+    
+                            if (!running.get()) {
+                                // do not start processing the task if we are not running
+                                return subExchange;
+                            }
+    
+                            try {
+                                doProcessParallel(pair);
+                            } catch (Throwable e) {
+                                subExchange.setException(e);
+                            }
+    
+                            // Decide whether to continue with the multicast or not; similar logic to the Pipeline
+                            Integer number = getExchangeIndex(subExchange);
+                            boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + number, LOG);
+                            if (stopOnException && !continueProcessing) {
+                                // signal to stop running
+                                running.set(false);
+                                // throw caused exception
+                                if (subExchange.getException() != null) {
+                                    // wrap in exception to explain where it failed
+                                    CamelExchangeException cause = new CamelExchangeException("Parallel processing failed for number " + number, subExchange, subExchange.getException());
+                                    subExchange.setException(cause);
+                                }
                             }
+    
+                            LOG.trace("Parallel processing complete for exchange: {}", subExchange);
+                            return subExchange;
                         }
-
-                        LOG.trace("Parallel processing complete for exchange: {}", subExchange);
-                        return subExchange;
-                    }
-                });
-
-                total.incrementAndGet();
+                    });
+    
+                    total.incrementAndGet();
+                }
+            } catch (RuntimeException e) {
+                // The methods it.hasNext and it.next can throw RuntimeExceptions when custom iterators are implemented.
+                // We have to catch the exception here otherwise the aggregator threads would pile up.
+                executionException.set(e);
             }
 
             // signal all tasks has been submitted
diff --git a/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelRuntimeExceptionInHasNextOrNext.java b/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelRuntimeExceptionInHasNextOrNext.java
new file mode 100644
index 0000000..948aada
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelRuntimeExceptionInHasNextOrNext.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class SplitterParallelRuntimeExceptionInHasNextOrNext extends ContextTestSupport {
+
+    /**
+     * Tests that only one aggregator thread is created if a RuntimeException in
+     * the hasNext method of a custom iterator occurs.
+     */
+    @Test
+    public void testSplitErrorInHasNext() throws Exception {
+
+        execute("direct:errorInHasNext");
+    }
+
+    /**
+     * Tests that only one aggregator thread is created if a RuntimeException in
+     * the next method of a custom iterator occurs.
+     */
+    @Test
+    public void testSplitErrorInNext() throws Exception {
+
+        execute("direct:errorInNext");
+    }
+
+    private void execute(String from) throws InterruptedException {
+        for (int i = 0; i < 10; i++) {
+            try {
+                template.sendBody(from, "some content");
+            } catch (Exception e) {
+                // expected due to runtime exception in hasNext method
+                assertTrue(e.getMessage().startsWith("Exception occurred"));
+            }
+            assertMockEndpointsSatisfied();
+        }
+        List<Thread> aggregatorThreads = getAggregatorThreads();
+        assertEquals(1, aggregatorThreads.size());
+    }
+
+    private List<Thread> getAggregatorThreads() {
+        List<Thread> result = new ArrayList<>();
+        for (Thread t : Thread.getAllStackTraces().keySet()) {
+            if (t.getName().endsWith("Splitter-AggregateTask")) {
+                result.add(t);
+            }
+        }
+        return result;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+
+                from("direct:errorInHasNext").split().method(SplitterImpl.class, "errorInHasNext").streaming().parallelProcessing(true).to("mock:split1");
+
+                from("direct:errorInNext").split().method(SplitterImpl.class, "errorInNext").streaming().parallelProcessing(true).to("mock:split2");
+
+            }
+        };
+    }
+
+    public static class SplitterImpl {
+
+        public Iterator<String> errorInHasNext(InputStream request, Exchange exchange) {
+
+            return new CustomIterator(exchange, request, true);
+        }
+
+        public Iterator<String> errorInNext(InputStream request, Exchange exchange) {
+
+            return new CustomIterator(exchange, request, false);
+        }
+
+    }
+
+    static class CustomIterator implements Iterator<String>, Closeable {
+
+        private int index = 0;
+        private InputStream request;
+        private boolean errorInHasNext;
+
+        CustomIterator(Exchange exchange, InputStream request, boolean errorInHasNext) {
+
+            this.request = request;
+            this.errorInHasNext = errorInHasNext;
+
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (index < 7) {
+                return true;
+            }
+            if (errorInHasNext) {
+                throw new RuntimeException("Exception thrown");
+            } else {
+                return false;
+            }
+        }
+
+        @Override
+        public String next() {
+            index++;
+
+            if (index < 7) {
+                return "<a>" + index + "</a>";
+            }
+            if (!errorInHasNext) {
+                throw new RuntimeException("Exception thrown");
+            } else {
+                return "<a>" + index + "</a>";
+            }
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void close() throws IOException {
+            request.close();
+        }
+    }
+
+}

-- 
To stop receiving notification emails like this one, please contact
"commits@camel.apache.org" <co...@camel.apache.org>.

[camel] 04/04: CAMEL-12075: Polished. This closes #2142

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.20.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit a6d5e1bcf36bfae73aa510a3416f4dc0ef293327
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Dec 13 09:53:05 2017 +0100

    CAMEL-12075: Polished. This closes #2142
---
 .../main/java/org/apache/camel/processor/MulticastProcessor.java  | 8 ++++++--
 .../issues/SplitterParallelRuntimeExceptionInHasNextOrNext.java   | 5 -----
 2 files changed, 6 insertions(+), 7 deletions(-)

diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 90a6f7b..b16c8e1 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -353,10 +353,14 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
     
                     total.incrementAndGet();
                 }
-            } catch (RuntimeException e) {
+            } catch (Throwable e) {
                 // The methods it.hasNext and it.next can throw RuntimeExceptions when custom iterators are implemented.
                 // We have to catch the exception here otherwise the aggregator threads would pile up.
-                executionException.set(e);
+                if (e instanceof Exception) {
+                    executionException.set((Exception) e);
+                } else {
+                    executionException.set(ObjectHelper.wrapRuntimeCamelException(e));
+                }
             }
 
             // signal all tasks has been submitted
diff --git a/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelRuntimeExceptionInHasNextOrNext.java b/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelRuntimeExceptionInHasNextOrNext.java
index 948aada..5eeafd2 100644
--- a/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelRuntimeExceptionInHasNextOrNext.java
+++ b/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelRuntimeExceptionInHasNextOrNext.java
@@ -36,7 +36,6 @@ public class SplitterParallelRuntimeExceptionInHasNextOrNext extends ContextTest
      */
     @Test
     public void testSplitErrorInHasNext() throws Exception {
-
         execute("direct:errorInHasNext");
     }
 
@@ -46,7 +45,6 @@ public class SplitterParallelRuntimeExceptionInHasNextOrNext extends ContextTest
      */
     @Test
     public void testSplitErrorInNext() throws Exception {
-
         execute("direct:errorInNext");
     }
 
@@ -79,11 +77,9 @@ public class SplitterParallelRuntimeExceptionInHasNextOrNext extends ContextTest
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-
                 from("direct:errorInHasNext").split().method(SplitterImpl.class, "errorInHasNext").streaming().parallelProcessing(true).to("mock:split1");
 
                 from("direct:errorInNext").split().method(SplitterImpl.class, "errorInNext").streaming().parallelProcessing(true).to("mock:split2");
-
             }
         };
     }
@@ -109,7 +105,6 @@ public class SplitterParallelRuntimeExceptionInHasNextOrNext extends ContextTest
         private boolean errorInHasNext;
 
         CustomIterator(Exchange exchange, InputStream request, boolean errorInHasNext) {
-
             this.request = request;
             this.errorInHasNext = errorInHasNext;
 

-- 
To stop receiving notification emails like this one, please contact
"commits@camel.apache.org" <co...@camel.apache.org>.

[camel] 02/04: Java 7'isms for NotifyBuilder

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.20.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 7115cee90cd3c4cd2fec64ecaeb657a8934a32e0
Author: Marc Carter <dr...@fastmail.fm>
AuthorDate: Sun Dec 3 15:34:21 2017 +0000

    Java 7'isms for NotifyBuilder
---
 .../java/org/apache/camel/builder/NotifyBuilder.java  | 19 ++++++++++---------
 1 file changed, 10 insertions(+), 9 deletions(-)

diff --git a/camel-core/src/main/java/org/apache/camel/builder/NotifyBuilder.java b/camel-core/src/main/java/org/apache/camel/builder/NotifyBuilder.java
index 0fc7045..1d74bfa 100644
--- a/camel-core/src/main/java/org/apache/camel/builder/NotifyBuilder.java
+++ b/camel-core/src/main/java/org/apache/camel/builder/NotifyBuilder.java
@@ -43,6 +43,7 @@ import org.apache.camel.support.EventNotifierSupport;
 import org.apache.camel.util.EndpointHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
+import org.apache.camel.util.StringHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,13 +67,13 @@ public class NotifyBuilder {
     private final EventNotifierSupport eventNotifier;
 
     // the predicates build with this builder
-    private final List<EventPredicateHolder> predicates = new ArrayList<EventPredicateHolder>();
+    private final List<EventPredicateHolder> predicates = new ArrayList<>();
 
     // latch to be used to signal predicates matches
     private CountDownLatch latch = new CountDownLatch(1);
 
     // the current state while building an event predicate where we use a stack and the operation
-    private final List<EventPredicate> stack = new ArrayList<EventPredicate>();
+    private final List<EventPredicate> stack = new ArrayList<>();
     private EventOperation operation;
     private boolean created;
     // keep state of how many wereSentTo we have added
@@ -251,7 +252,7 @@ public class NotifyBuilder {
      * @return the builder
      */
     public ExpressionClauseSupport<NotifyBuilder> filter() {
-        final ExpressionClauseSupport<NotifyBuilder> clause = new ExpressionClauseSupport<NotifyBuilder>(this);
+        final ExpressionClauseSupport<NotifyBuilder> clause = new ExpressionClauseSupport<>(this);
         stack.add(new EventPredicateSupport() {
 
             @Override
@@ -294,7 +295,7 @@ public class NotifyBuilder {
     public NotifyBuilder wereSentTo(final String endpointUri) {
         // insert in start of stack but after the previous wereSentTo
         stack.add(wereSentToIndex++, new EventPredicateSupport() {
-            private ConcurrentMap<String, String> sentTo = new ConcurrentHashMap<String, String>();
+            private ConcurrentMap<String, String> sentTo = new ConcurrentHashMap<>();
 
             @Override
             public boolean isAbstract() {
@@ -1053,7 +1054,7 @@ public class NotifyBuilder {
      * @see #whenExactBodiesDone(Object...)
      */
     public NotifyBuilder whenExactBodiesDone(Object... bodies) {
-        List<Object> bodyList = new ArrayList<Object>();
+        List<Object> bodyList = new ArrayList<>();
         bodyList.addAll(Arrays.asList(bodies));
         return doWhenBodies(bodyList, false, true);
     }
@@ -1278,7 +1279,7 @@ public class NotifyBuilder {
             sb.append(eventPredicateHolder.toString());
         }
         // a crude way of skipping the first invisible operation
-        return ObjectHelper.after(sb.toString(), "().");
+        return StringHelper.after(sb.toString(), "().");
     }
 
     private void doCreate(EventOperation newOperation) {
@@ -1418,7 +1419,7 @@ public class NotifyBuilder {
     }
 
     private enum EventOperation {
-        and, or, not;
+        and, or, not
     }
 
     private interface EventPredicate {
@@ -1543,7 +1544,7 @@ public class NotifyBuilder {
      */
     private final class CompoundEventPredicate implements EventPredicate {
 
-        private List<EventPredicate> predicates = new ArrayList<EventPredicate>();
+        private List<EventPredicate> predicates = new ArrayList<>();
 
         private CompoundEventPredicate(List<EventPredicate> predicates) {
             this.predicates.addAll(predicates);
@@ -1612,7 +1613,7 @@ public class NotifyBuilder {
         public boolean onExchangeSent(Exchange exchange, Endpoint endpoint, long timeTaken) {
             for (EventPredicate predicate : predicates) {
                 boolean answer = predicate.onExchangeSent(exchange, endpoint, timeTaken);
-                LOG.trace("onExchangeSent() {} {} -> {}", new Object[]{endpoint, predicate, answer});
+                LOG.trace("onExchangeSent() {} {} -> {}", endpoint, predicate, answer);
                 if (!answer) {
                     // break at first false
                     return false;

-- 
To stop receiving notification emails like this one, please contact
"commits@camel.apache.org" <co...@camel.apache.org>.

[camel] 01/04: CAMEL-12056: Introduce NotifyBuilder.destroy()

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.20.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 5f49d5aee13999057a31794031936f398568a9f0
Author: Marc Carter <dr...@fastmail.fm>
AuthorDate: Sun Dec 3 15:14:27 2017 +0000

    CAMEL-12056: Introduce NotifyBuilder.destroy()
---
 .../org/apache/camel/builder/NotifyBuilder.java    | 21 ++++++++++--
 .../apache/camel/builder/NotifyBuilderTest.java    | 39 ++++++++++++++++++++++
 2 files changed, 58 insertions(+), 2 deletions(-)

diff --git a/camel-core/src/main/java/org/apache/camel/builder/NotifyBuilder.java b/camel-core/src/main/java/org/apache/camel/builder/NotifyBuilder.java
index e5933494..0fc7045 100644
--- a/camel-core/src/main/java/org/apache/camel/builder/NotifyBuilder.java
+++ b/camel-core/src/main/java/org/apache/camel/builder/NotifyBuilder.java
@@ -39,7 +39,6 @@ import org.apache.camel.management.event.ExchangeCompletedEvent;
 import org.apache.camel.management.event.ExchangeCreatedEvent;
 import org.apache.camel.management.event.ExchangeFailedEvent;
 import org.apache.camel.management.event.ExchangeSentEvent;
-import org.apache.camel.spi.EventNotifier;
 import org.apache.camel.support.EventNotifierSupport;
 import org.apache.camel.util.EndpointHelper;
 import org.apache.camel.util.ObjectHelper;
@@ -64,7 +63,7 @@ public class NotifyBuilder {
     private final CamelContext context;
 
     // notifier to hook into Camel to listen for events
-    private final EventNotifier eventNotifier;
+    private final EventNotifierSupport eventNotifier;
 
     // the predicates build with this builder
     private final List<EventPredicateHolder> predicates = new ArrayList<EventPredicateHolder>();
@@ -1164,11 +1163,29 @@ public class NotifyBuilder {
      */
     public NotifyBuilder create() {
         doCreate(EventOperation.and);
+        if (eventNotifier.isStopped()) {
+            throw new IllegalStateException("A destroyed NotifyBuilder cannot be re-created.");
+        }
         created = true;
         return this;
     }
 
     /**
+     * De-registers this builder from its {@link CamelContext}.
+     * <p/>
+     * Once destroyed, this instance will not function again.
+     */
+    public void destroy() {
+        context.getManagementStrategy().removeEventNotifier(eventNotifier);
+        try {
+            ServiceHelper.stopService(eventNotifier);
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+        created = false;
+    }
+
+    /**
      * Does all the expression match?
      * <p/>
      * This operation will return immediately which means it can be used for testing at this very moment.
diff --git a/camel-core/src/test/java/org/apache/camel/builder/NotifyBuilderTest.java b/camel-core/src/test/java/org/apache/camel/builder/NotifyBuilderTest.java
index 8c56713..05dd247 100644
--- a/camel-core/src/test/java/org/apache/camel/builder/NotifyBuilderTest.java
+++ b/camel-core/src/test/java/org/apache/camel/builder/NotifyBuilderTest.java
@@ -36,6 +36,45 @@ public class NotifyBuilderTest extends ContextTestSupport {
         }
     }
 
+    public void testDestroyUnregistersBuilder() throws Exception {
+        // Given:
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(1).create();
+        // When:
+        int withReg = context.getManagementStrategy().getEventNotifiers().size();
+        notify.destroy();
+        int afterDestroy = context.getManagementStrategy().getEventNotifiers().size();
+        // Then:
+        assertEquals(withReg - afterDestroy, 1);
+    }
+
+    public void testDestroyResetsBuilder() throws Exception {
+        // Given:
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(1).create();
+        // When:
+        notify.destroy();
+        // Then:
+        try {
+            notify.matches();
+            fail("Should have thrown an exception");
+        } catch (IllegalStateException e) {
+            assertEquals("NotifyBuilder has not been created. Invoke the create() method before matching.", e.getMessage());
+        }
+    }
+
+    public void testDestroyedBuilderCannotBeRecreated() throws Exception {
+        // Given:
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(1).create();
+        // When:
+        notify.destroy();
+        // Then:
+        try {
+            notify.create();
+            fail("Should have thrown an exception");
+        } catch (IllegalStateException e) {
+            assertEquals("A destroyed NotifyBuilder cannot be re-created.", e.getMessage());
+        }
+    }
+
     public void testDirectWhenExchangeDoneSimple() throws Exception {
         NotifyBuilder notify = new NotifyBuilder(context)
                 .from("direct:foo").whenDone(1)

-- 
To stop receiving notification emails like this one, please contact
"commits@camel.apache.org" <co...@camel.apache.org>.