You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/09/11 12:58:13 UTC

svn commit: r813776 - in /camel/trunk: camel-core/ camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/management/mbean/ camel-core/src/main/java/org/apache/camel/spi/ cam...

Author: davsclaus
Date: Fri Sep 11 10:58:10 2009
New Revision: 813776

URL: http://svn.apache.org/viewvc?rev=813776&view=rev
Log:
CAMEL-1679: Added InflightRepository with a very basic implementation that can help with graceful shutdown to avoid shutting down if there are in progress exchanges.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java   (with props)
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryTest.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java   (with props)
Modified:
    camel/trunk/camel-core/   (props changed)
    camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
    camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java

Propchange: camel/trunk/camel-core/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Fri Sep 11 10:58:10 2009
@@ -6,3 +6,4 @@
 .settings
 eclipse-classes
 *.i??
+classes

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java?rev=813776&r1=813775&r2=813776&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java Fri Sep 11 10:58:10 2009
@@ -28,6 +28,7 @@
 import org.apache.camel.spi.EndpointStrategy;
 import org.apache.camel.spi.FactoryFinder;
 import org.apache.camel.spi.FactoryFinderResolver;
+import org.apache.camel.spi.InflightRepository;
 import org.apache.camel.spi.Injector;
 import org.apache.camel.spi.InterceptStrategy;
 import org.apache.camel.spi.Language;
@@ -552,4 +553,8 @@
      */
     void disableJMX();
 
+    InflightRepository getInflightRepository();
+
+    void setInflightRepository(InflightRepository repository);
+
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=813776&r1=813775&r2=813776&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Fri Sep 11 10:58:10 2009
@@ -63,6 +63,7 @@
 import org.apache.camel.spi.EndpointStrategy;
 import org.apache.camel.spi.FactoryFinder;
 import org.apache.camel.spi.FactoryFinderResolver;
+import org.apache.camel.spi.InflightRepository;
 import org.apache.camel.spi.Injector;
 import org.apache.camel.spi.InterceptStrategy;
 import org.apache.camel.spi.Language;
@@ -133,6 +134,7 @@
     private ServicePool<Endpoint, Producer> producerServicePool = new DefaultProducerServicePool(100);
     private NodeIdFactory nodeIdFactory = new DefaultNodeIdFactory();
     private Tracer defaultTracer;
+    private InflightRepository inflightRepository = new DefaultInflightRepository();
 
     public DefaultCamelContext() {
         super();
@@ -1206,6 +1208,14 @@
         disableJMX = true;
     }
 
+    public InflightRepository getInflightRepository() {
+        return inflightRepository;
+    }
+
+    public void setInflightRepository(InflightRepository repository) {
+        this.inflightRepository = repository;
+    }
+
     protected String getEndpointKey(String uri, Endpoint endpoint) {
         if (endpoint.isSingleton()) {
             return uri;

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java?rev=813776&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java Fri Sep 11 10:58:10 2009
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.spi.InflightRepository;
+
+/**
+ * Default implement which just uses a counter
+ *
+ * @version $Revision$
+ */
+public class DefaultInflightRepository implements InflightRepository {
+
+    private final AtomicInteger count = new AtomicInteger();
+
+    public void add(Exchange exchange) {
+        count.incrementAndGet();
+    }
+
+    public void remove(Exchange exchange) {
+        count.decrementAndGet();
+    }
+
+    public int size() {
+        return count.get();
+    }
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java?rev=813776&r1=813775&r2=813776&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java Fri Sep 11 10:58:10 2009
@@ -64,6 +64,11 @@
 
         // fire event
         EventHelper.notifyExchangeCreated(exchange.getContext(), exchange);
+
+        // register to inflight registry
+        if (exchange.getContext() != null) {
+            exchange.getContext().getInflightRepository().add(exchange);
+        }
     }
 
     public void start() throws Exception {
@@ -133,6 +138,12 @@
                 }
             }
         }
+
+        // unregister from inflight registry
+        if (exchange.getContext() != null) {
+            exchange.getContext().getInflightRepository().remove(exchange);
+        }
+
     }
 
     public String getId() {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java?rev=813776&r1=813775&r2=813776&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java Fri Sep 11 10:58:10 2009
@@ -76,6 +76,11 @@
         context.setTracing(tracing);
     }
 
+    @ManagedAttribute(description = "Current number of inflight Exchanges")
+    public Integer getInflightExchanges() {
+        return context.getInflightRepository().size();
+    }
+
     @ManagedOperation(description = "Start Camel")
     public void start() throws Exception {
         context.start();

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java?rev=813776&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java Fri Sep 11 10:58:10 2009
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import org.apache.camel.Exchange;
+
+/**
+ * @version $Revision$
+ */
+public interface InflightRepository {
+
+    /**
+     * Adds the exchange to the inflight registry
+     *
+     * @param exchange  the exchange
+     */
+    void add(Exchange exchange);
+
+    /**
+     * Removes the exchange from the inflight registry
+     *
+     * @param exchange  the exchange
+     */
+    void remove(Exchange exchange);
+
+    /**
+     * Current size of inflight exchanges.
+     * <p/>
+     * Will return 0 if there are no inflight exchanges.
+     *
+     * @return number of exchanges currently in flight.
+     */
+    int size();
+
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryTest.java?rev=813776&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryTest.java Fri Sep 11 10:58:10 2009
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.spi.InflightRepository;
+
+/**
+ * @version $Revision$
+ */
+public class DefaultInflightRepositoryTest extends ContextTestSupport {
+
+    public void testDefaultInflightRepository() throws Exception {
+        InflightRepository repo = new DefaultInflightRepository();
+
+        assertEquals(0, repo.size());
+
+        Exchange e1 = new DefaultExchange(context);
+        repo.add(e1);
+        assertEquals(1, repo.size());
+
+        Exchange e2 = new DefaultExchange(context);
+        repo.add(e2);
+        assertEquals(2, repo.size());
+
+        repo.remove(e2);
+        assertEquals(1, repo.size());
+
+        repo.remove(e1);
+        assertEquals(0, repo.size());
+    }
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java?rev=813776&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java Fri Sep 11 10:58:10 2009
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class InflightRepositoryRouteTest extends ContextTestSupport {
+
+    private final CountDownLatch latch = new CountDownLatch(1);
+
+    public void testInflight() throws Exception {
+        assertEquals(0, context.getInflightRepository().size());
+
+        template.asyncSendBody("direct:start", "Hello World");
+        latch.await(5, TimeUnit.SECONDS);
+
+        assertEquals(1, context.getInflightRepository().size());
+
+        // wait to be sure its done
+        Thread.sleep(2000);
+
+        assertEquals(0, context.getInflightRepository().size());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                latch.countDown();
+                            }
+                        }).delay(1000).to("mock:result");
+            }
+        };
+    }
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java?rev=813776&r1=813775&r2=813776&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java (original)
+++ camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java Fri Sep 11 10:58:10 2009
@@ -62,6 +62,7 @@
 import org.apache.camel.spi.EventFactory;
 import org.apache.camel.spi.EventNotifier;
 import org.apache.camel.spi.FactoryFinderResolver;
+import org.apache.camel.spi.InflightRepository;
 import org.apache.camel.spi.InterceptStrategy;
 import org.apache.camel.spi.LifecycleStrategy;
 import org.apache.camel.spi.ManagementStrategy;
@@ -234,6 +235,12 @@
             getContext().addInterceptStrategy(delayer);
         }
 
+        InflightRepository inflightRepository = getBeanForType(InflightRepository.class);
+        if (delayer != null) {
+            LOG.info("Using custom InflightRepository: " + inflightRepository);
+            getContext().setInflightRepository(inflightRepository);
+        }
+
         ManagementStrategy managementStrategy = getBeanForType(ManagementStrategy.class);
         if (managementStrategy != null) {
             LOG.info("Using custom ManagementStrategy: " + managementStrategy);