You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/06/05 15:55:12 UTC

git commit: CAMEL-6151: Added support for blocking direct-vm producers.

Updated Branches:
  refs/heads/camel-2.11.x 66e1c2812 -> d69fe7af4


CAMEL-6151: Added support for blocking direct-vm producers.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d69fe7af
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d69fe7af
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d69fe7af

Branch: refs/heads/camel-2.11.x
Commit: d69fe7af49e4ee723a4a80b3ed2584daa0a38e74
Parents: 66e1c28
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Jun 5 15:54:00 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Jun 5 15:54:54 2013 +0200

----------------------------------------------------------------------
 .../directvm/DirectVmBlockingProducer.java         |   97 +++++++++++++++
 .../component/directvm/DirectVmComponent.java      |   19 +++
 .../camel/component/directvm/DirectVmConsumer.java |   13 ++-
 .../camel/component/directvm/DirectVmEndpoint.java |   24 ++++-
 .../directvm/AbstractDirectVmTestSupport.java      |    2 +-
 .../directvm/DirectVmProducerBlockingTest.java     |   96 ++++++++++++++
 6 files changed, 248 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d69fe7af/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmBlockingProducer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmBlockingProducer.java b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmBlockingProducer.java
new file mode 100644
index 0000000..77aa69c
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmBlockingProducer.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.directvm;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.util.StopWatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The direct producer.
+ * <p/>
+ * If blocking is enabled ({@code DirectEndpoint#isBlock}) then the DirectEndpoint will create an instance
+ * of this class instead of {@code DirectProducer}.
+ * This producers {@code process} method will block for the configured duration ({@code DirectEndpoint#getTimeout},
+ * default to 30 seconds). After which if a consumer is still unavailable a DirectConsumerNotAvailableException
+ * will be thrown.
+ * <p/>
+ * Implementation note: Concurrent Producers will block for the duration it takes to determine if a
+ * consumer is available, but actual consumer execution will happen concurrently.
+ */
+public class DirectVmBlockingProducer extends DefaultAsyncProducer {
+    private static final transient Logger LOG = LoggerFactory.getLogger(DirectVmBlockingProducer.class);
+    private final DirectVmEndpoint endpoint;
+
+    public DirectVmBlockingProducer(DirectVmEndpoint endpoint) {
+        super(endpoint);
+        this.endpoint = endpoint;
+    }
+
+    public void process(Exchange exchange) throws Exception {
+        getConsumer(exchange).getProcessor().process(exchange);
+    }
+
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        try {
+            return getConsumer(exchange).getAsyncProcessor().process(exchange, callback);
+        } catch (Exception e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
+        }
+    }
+
+    protected DirectVmConsumer getConsumer(Exchange exchange) throws Exception {
+        DirectVmConsumer answer = endpoint.getConsumer();
+        if (answer == null) {
+            // okay then await until we have a consumer or we timed out
+            answer = awaitConsumer();
+            if (answer == null) {
+                LOG.warn("No consumers available on endpoint: " + endpoint + " to process: " + exchange);
+                throw new DirectVmConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange);
+            }
+        }
+
+        return answer;
+    }
+
+    private DirectVmConsumer awaitConsumer() throws InterruptedException {
+        DirectVmConsumer answer = null;
+
+        StopWatch watch = new StopWatch();
+        boolean done = false;
+        while (!done) {
+            // sleep a bit to give chance for the consumer to be ready
+            Thread.sleep(500);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Waited {} for consumer to be ready", watch.taken());
+            }
+
+            answer = endpoint.getConsumer();
+            if (answer != null) {
+                return answer;
+            }
+            // we are done if we hit the timeout
+            done = watch.taken() >= endpoint.getTimeout();
+        }
+        return answer;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/d69fe7af/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmComponent.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmComponent.java b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmComponent.java
index 513d174..6ad3d28 100644
--- a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmComponent.java
+++ b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmComponent.java
@@ -38,6 +38,8 @@ public class DirectVmComponent extends DefaultComponent {
     // later in case the DirectVmEndpoint was re-created due the old was evicted from the endpoints LRUCache
     // on DefaultCamelContext
     private static final ConcurrentMap<String, DirectVmConsumer> CONSUMERS = new ConcurrentHashMap<String, DirectVmConsumer>();
+    private boolean block;
+    private long timeout = 30000L;
 
     /**
      * Gets all the consumer endpoints.
@@ -55,6 +57,8 @@ public class DirectVmComponent extends DefaultComponent {
     @Override
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
         DirectVmEndpoint answer = new DirectVmEndpoint(uri, this);
+        answer.setBlock(block);
+        answer.setTimeout(timeout);
         answer.configureProperties(parameters);
         return answer;
     }
@@ -101,4 +105,19 @@ public class DirectVmComponent extends DefaultComponent {
         super.doStop();
     }
 
+    public boolean isBlock() {
+        return block;
+    }
+
+    public void setBlock(boolean block) {
+        this.block = block;
+    }
+
+    public long getTimeout() {
+        return timeout;
+    }
+
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/d69fe7af/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmConsumer.java b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmConsumer.java
index 5ab0bb0..037b7e2 100644
--- a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmConsumer.java
@@ -17,12 +17,13 @@
 package org.apache.camel.component.directvm;
 
 import org.apache.camel.Processor;
+import org.apache.camel.SuspendableService;
 import org.apache.camel.impl.DefaultConsumer;
 
 /**
  * The direct-vm consumer
  */
-public class DirectVmConsumer extends DefaultConsumer {
+public class DirectVmConsumer extends DefaultConsumer implements SuspendableService {
 
     public DirectVmConsumer(DirectVmEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -45,4 +46,14 @@ public class DirectVmConsumer extends DefaultConsumer {
         super.doStop();
     }
 
+    @Override
+    protected void doSuspend() throws Exception {
+        getEndpoint().getComponent().removeConsumer(getEndpoint(), this);
+    }
+
+    @Override
+    protected void doResume() throws Exception {
+        getEndpoint().getComponent().addConsumer(getEndpoint(), this);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/d69fe7af/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java
index 230b93f..d5a9885 100644
--- a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java
@@ -26,6 +26,9 @@ import org.apache.camel.impl.DefaultEndpoint;
  */
 public class DirectVmEndpoint extends DefaultEndpoint {
 
+    private boolean block;
+    private long timeout = 30000L;
+
     public DirectVmEndpoint(String endpointUri, DirectVmComponent component) {
         super(endpointUri, component);
     }
@@ -37,7 +40,11 @@ public class DirectVmEndpoint extends DefaultEndpoint {
 
     @Override
     public Producer createProducer() throws Exception {
-        return new DirectVmProducer(this);
+        if (block) {
+            return new DirectVmBlockingProducer(this);
+        } else {
+            return new DirectVmProducer(this);
+        }
     }
 
     @Override
@@ -56,4 +63,19 @@ public class DirectVmEndpoint extends DefaultEndpoint {
         return getComponent().getConsumer(this);
     }
 
+    public boolean isBlock() {
+        return block;
+    }
+
+    public void setBlock(boolean block) {
+        this.block = block;
+    }
+
+    public long getTimeout() {
+        return timeout;
+    }
+
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/d69fe7af/camel-core/src/test/java/org/apache/camel/component/directvm/AbstractDirectVmTestSupport.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/directvm/AbstractDirectVmTestSupport.java b/camel-core/src/test/java/org/apache/camel/component/directvm/AbstractDirectVmTestSupport.java
index 2724813..f093a18 100644
--- a/camel-core/src/test/java/org/apache/camel/component/directvm/AbstractDirectVmTestSupport.java
+++ b/camel-core/src/test/java/org/apache/camel/component/directvm/AbstractDirectVmTestSupport.java
@@ -28,7 +28,7 @@ import org.junit.Before;
 /**
  *
  */
-public class AbstractDirectVmTestSupport extends ContextTestSupport {
+public abstract class AbstractDirectVmTestSupport extends ContextTestSupport {
     protected CamelContext context2;
     protected ProducerTemplate template2;
 

http://git-wip-us.apache.org/repos/asf/camel/blob/d69fe7af/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmProducerBlockingTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmProducerBlockingTest.java b/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmProducerBlockingTest.java
new file mode 100644
index 0000000..9c92785
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmProducerBlockingTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.directvm;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.util.StopWatch;
+
+public class DirectVmProducerBlockingTest extends ContextTestSupport {
+
+    public void testProducerBlocksForSuspendedConsumer() throws Exception {
+        DirectVmEndpoint endpoint = getMandatoryEndpoint("direct-vm:suspended", DirectVmEndpoint.class);
+        endpoint.getConsumer().suspend();
+
+        StopWatch watch = new StopWatch();
+        try {
+            template.sendBody("direct-vm:suspended?block=true&timeout=2000", "hello world");
+            fail("Expected CamelExecutionException");
+        } catch (CamelExecutionException e) {
+            DirectVmConsumerNotAvailableException cause = assertIsInstanceOf(DirectVmConsumerNotAvailableException.class, e.getCause());
+            assertIsInstanceOf(CamelExchangeException.class, cause);
+            assertTrue(watch.taken() > 1500);
+        }
+    }
+
+    public void testProducerBlocksWithNoConsumers() throws Exception {
+        DirectVmEndpoint endpoint = getMandatoryEndpoint("direct-vm:suspended", DirectVmEndpoint.class);
+        endpoint.getConsumer().suspend();
+
+        StopWatch watch = new StopWatch();
+        try {
+            template.sendBody("direct-vm:start?block=true&timeout=2000", "hello world");
+            fail("Expected CamelExecutionException");
+        } catch (CamelExecutionException e) {
+            DirectVmConsumerNotAvailableException cause = assertIsInstanceOf(DirectVmConsumerNotAvailableException.class, e.getCause());
+            assertIsInstanceOf(CamelExchangeException.class, cause);
+
+            assertTrue(watch.taken() > 1500);
+        }
+    }
+
+    public void testProducerBlocksResumeTest() throws Exception {
+        context.suspendRoute("foo");
+
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        executor.submit(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    Thread.sleep(2000);
+                    log.info("Resuming consumer");
+                    context.resumeRoute("foo");
+                } catch (Exception e) {
+                    // ignore
+                }
+            }
+        });
+
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+
+        template.sendBody("direct-vm:suspended?block=true&timeout=5000", "hello world");
+
+        assertMockEndpointsSatisfied();
+
+        executor.shutdownNow();
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("direct-vm:suspended").routeId("foo")
+                    .to("mock:result");
+            }
+        };
+    }
+
+}