You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/06/03 17:51:08 UTC

[1/2] git commit: CAMEL-6151: Added support for blocking direct producers. Thanks to Aaron Whiteside for partial patch.

Updated Branches:
  refs/heads/camel-2.11.x 6f10a76bc -> 6c38b9036
  refs/heads/master e14c66357 -> 4684225fd


CAMEL-6151: Added support for blocking direct producers. Thanks to Aaron Whiteside for partial patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4684225f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4684225f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4684225f

Branch: refs/heads/master
Commit: 4684225fd15b04021c28605e7d37c1ef89b08d5f
Parents: e14c663
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jun 3 17:48:53 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jun 3 17:48:53 2013 +0200

----------------------------------------------------------------------
 .../component/direct/DirectBlockingProducer.java   |   97 +++++++++++++++
 .../camel/component/direct/DirectConsumer.java     |    5 +
 .../camel/component/direct/DirectEndpoint.java     |   41 ++++++-
 .../direct/DirectProducerBlockingTest.java         |   96 ++++++++++++++
 .../camel/impl/CustomExchangePatternTest.java      |   96 --------------
 5 files changed, 234 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/4684225f/camel-core/src/main/java/org/apache/camel/component/direct/DirectBlockingProducer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/direct/DirectBlockingProducer.java b/camel-core/src/main/java/org/apache/camel/component/direct/DirectBlockingProducer.java
new file mode 100644
index 0000000..1b28bed
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/component/direct/DirectBlockingProducer.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.direct;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.util.StopWatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The direct producer.
+ * <p/>
+ * If blocking is enabled ({@code DirectEndpoint#isBlock}) then the DirectEndpoint will create an instance
+ * of this class instead of {@code DirectProducer}.
+ * This producers {@code process} method will block for the configured duration ({@code DirectEndpoint#getTimeout},
+ * default to 30 seconds). After which if a consumer is still unavailable a DirectConsumerNotAvailableException
+ * will be thrown.
+ * <p/>
+ * Implementation note: Concurrent Producers will block for the duration it takes to determine if a
+ * consumer is available, but actual consumer execution will happen concurrently.
+ */
+public class DirectBlockingProducer extends DefaultAsyncProducer {
+    private static final transient Logger LOG = LoggerFactory.getLogger(DirectBlockingProducer.class);
+    private final DirectEndpoint endpoint;
+
+    public DirectBlockingProducer(DirectEndpoint endpoint) {
+        super(endpoint);
+        this.endpoint = endpoint;
+    }
+
+    public void process(Exchange exchange) throws Exception {
+        getConsumer(exchange).getProcessor().process(exchange);
+    }
+
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        try {
+            return getConsumer(exchange).getAsyncProcessor().process(exchange, callback);
+        } catch (Exception e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
+        }
+    }
+
+   protected DirectConsumer getConsumer(Exchange exchange) throws Exception {
+        DirectConsumer answer = endpoint.getConsumer();
+        if (answer == null) {
+            // okay then await until we have a consumer or we timed out
+            answer = awaitConsumer();
+            if (answer == null) {
+                LOG.warn("No consumers available on endpoint: " + endpoint + " to process: " + exchange);
+                throw new DirectConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange);
+            }
+        }
+
+        return answer;
+    }
+
+    private DirectConsumer awaitConsumer() throws InterruptedException {
+        DirectConsumer answer = null;
+
+        StopWatch watch = new StopWatch();
+        boolean done = false;
+        while (!done) {
+            // sleep a bit to give chance for the consumer to be ready
+            Thread.sleep(500);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Waited {} for consumer to be ready", watch.taken());
+            }
+
+            answer = endpoint.getConsumer();
+            if (answer != null) {
+                return answer;
+            }
+            // we are done if we hit the timeout
+            done = watch.taken() >= endpoint.getTimeout();
+        }
+        return answer;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/4684225f/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java b/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
index f97128d..83dbbca 100644
--- a/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
@@ -38,6 +38,11 @@ public class DirectConsumer extends DefaultConsumer implements ShutdownAware, Su
     }
 
     @Override
+    public DirectEndpoint getEndpoint() {
+        return (DirectEndpoint) super.getEndpoint();
+    }
+
+    @Override
     protected void doStart() throws Exception {
         // add consumer to endpoint
         boolean existing = this == endpoint.getConsumer();

http://git-wip-us.apache.org/repos/asf/camel/blob/4684225f/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
index 9936505..844a74b 100644
--- a/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
@@ -24,6 +24,7 @@ import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.util.ObjectHelper;
 
 /**
  * Represents a direct endpoint that synchronously invokes the consumer of the
@@ -34,6 +35,8 @@ import org.apache.camel.impl.DefaultEndpoint;
 public class DirectEndpoint extends DefaultEndpoint {
 
     private volatile Map<String, DirectConsumer> consumers;
+    private boolean block;
+    private long timeout = 30000L;
 
     public DirectEndpoint() {
         this.consumers = new HashMap<String, DirectConsumer>();
@@ -49,7 +52,11 @@ public class DirectEndpoint extends DefaultEndpoint {
     }
 
     public Producer createProducer() throws Exception {
-        return new DirectProducer(this);
+        if (block) {
+            return new DirectBlockingProducer(this);
+        } else {
+            return new DirectProducer(this);
+        }
     }
 
     public Consumer createConsumer(Processor processor) throws Exception {
@@ -63,23 +70,47 @@ public class DirectEndpoint extends DefaultEndpoint {
     }
 
     public void addConsumer(DirectConsumer consumer) {
-        String key = consumer.getEndpoint().getEndpointKey();
+        String key = consumer.getEndpoint().getKey();
         consumers.put(key, consumer);
     }
 
     public void removeConsumer(DirectConsumer consumer) {
-        String key = consumer.getEndpoint().getEndpointKey();
+        String key = consumer.getEndpoint().getKey();
         consumers.remove(key);
     }
 
     public boolean hasConsumer(DirectConsumer consumer) {
-        String key = consumer.getEndpoint().getEndpointKey();
+        String key = consumer.getEndpoint().getKey();
         return consumers.containsKey(key);
     }
 
     public DirectConsumer getConsumer() {
-        String key = getEndpointKey();
+        String key = getKey();
         return consumers.get(key);
     }
 
+    public boolean isBlock() {
+        return block;
+    }
+
+    public void setBlock(boolean block) {
+        this.block = block;
+    }
+
+    public long getTimeout() {
+        return timeout;
+    }
+
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+
+    protected String getKey() {
+        String uri = getEndpointUri();
+        if (uri.indexOf('?') != -1) {
+            return ObjectHelper.before(uri, "?");
+        } else {
+            return uri;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/4684225f/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java b/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java
new file mode 100644
index 0000000..5322202
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.direct;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.util.StopWatch;
+
+public class DirectProducerBlockingTest extends ContextTestSupport {
+
+    public void testProducerBlocksForSuspendedConsumer() throws Exception {
+        DirectEndpoint endpoint = getMandatoryEndpoint("direct:suspended", DirectEndpoint.class);
+        endpoint.getConsumer().suspend();
+
+        StopWatch watch = new StopWatch();
+        try {
+            template.sendBody("direct:suspended?block=true&timeout=2000", "hello world");
+            fail("Expected CamelExecutionException");
+        } catch (CamelExecutionException e) {
+            DirectConsumerNotAvailableException cause = assertIsInstanceOf(DirectConsumerNotAvailableException.class, e.getCause());
+            assertIsInstanceOf(CamelExchangeException.class, cause);
+            assertTrue(watch.taken() > 1500);
+        }
+    }
+
+    public void testProducerBlocksWithNoConsumers() throws Exception {
+        DirectEndpoint endpoint = getMandatoryEndpoint("direct:suspended", DirectEndpoint.class);
+        endpoint.getConsumer().suspend();
+
+        StopWatch watch = new StopWatch();
+        try {
+            template.sendBody("direct:start?block=true&timeout=2000", "hello world");
+            fail("Expected CamelExecutionException");
+        } catch (CamelExecutionException e) {
+            DirectConsumerNotAvailableException cause = assertIsInstanceOf(DirectConsumerNotAvailableException.class, e.getCause());
+            assertIsInstanceOf(CamelExchangeException.class, cause);
+
+            assertTrue(watch.taken() > 1500);
+        }
+    }
+
+    public void testProducerBlocksResumeTest() throws Exception {
+        context.suspendRoute("foo");
+
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        executor.submit(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    Thread.sleep(2000);
+                    log.info("Resuming consumer");
+                    context.resumeRoute("foo");
+                } catch (Exception e) {
+                    // ignore
+                }
+            }
+        });
+
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+
+        template.sendBody("direct:suspended?block=true&timeout=5000", "hello world");
+
+        assertMockEndpointsSatisfied();
+
+        executor.shutdownNow();
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("direct:suspended").routeId("foo")
+                    .to("mock:result");
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/4684225f/camel-core/src/test/java/org/apache/camel/impl/CustomExchangePatternTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/CustomExchangePatternTest.java b/camel-core/src/test/java/org/apache/camel/impl/CustomExchangePatternTest.java
deleted file mode 100644
index cc942df..0000000
--- a/camel-core/src/test/java/org/apache/camel/impl/CustomExchangePatternTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.impl;
-
-import java.util.List;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.Processor;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-
-/**
- * @version 
- */
-public class CustomExchangePatternTest extends ContextTestSupport {
-    protected MockEndpoint resultEndpoint;
-
-    public void testInOut() throws Exception {
-        final ExchangePattern expectedPattern = ExchangePattern.InOut;
-
-        template.send("direct:start", expectedPattern, new Processor() {
-            public void process(Exchange exchange) throws Exception {
-                assertEquals("MEP", expectedPattern, exchange.getPattern());
-                exchange.getIn().setBody("<hello>world!</hello>");
-            }
-        });
-
-        resultEndpoint.assertIsSatisfied();
-        assertReceivedExpectedPattern(expectedPattern);
-    }
-
-    public void testInOnly() throws Exception {
-        ExchangePattern expectedPattern = ExchangePattern.InOnly;
-
-        template.send("direct:start", expectedPattern, new Processor() {
-            public void process(Exchange exchange) throws Exception {
-                exchange.getIn().setBody("<hello>world!</hello>");
-            }
-        });
-
-        resultEndpoint.assertIsSatisfied();
-        assertReceivedExpectedPattern(expectedPattern);
-    }
-
-    public void testInOutViaUri() throws Exception {
-        final ExchangePattern expectedPattern = ExchangePattern.InOut;
-
-        template.send("direct:start?exchangePattern=InOut", new Processor() {
-            public void process(Exchange exchange) throws Exception {
-                assertEquals("MEP", expectedPattern, exchange.getPattern());
-                exchange.getIn().setBody("<hello>world!</hello>");
-            }
-        });
-
-        resultEndpoint.assertIsSatisfied();
-        assertReceivedExpectedPattern(expectedPattern);
-    }
-
-    @Override
-    protected void setUp() throws Exception {
-        super.setUp();
-        resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(1);
-    }
-
-    protected void assertReceivedExpectedPattern(ExchangePattern expectedPattern) {
-        List<Exchange> list = resultEndpoint.getReceivedExchanges();
-        Exchange exchange = list.get(0);
-        assertEquals("MEP", expectedPattern, exchange.getPattern());
-    }
-
-    protected RouteBuilder createRouteBuilder() {
-        return new RouteBuilder() {
-            public void configure() {
-                from("direct:start").to("mock:result");
-                from("direct:start?exchangePattern=InOut").to("mock:result");
-            }
-        };
-    }
-}
\ No newline at end of file


[2/2] git commit: CAMEL-6151: Added support for blocking direct producers. Thanks to Aaron Whiteside for partial patch.

Posted by da...@apache.org.
CAMEL-6151: Added support for blocking direct producers. Thanks to Aaron Whiteside for partial patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6c38b903
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6c38b903
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6c38b903

Branch: refs/heads/camel-2.11.x
Commit: 6c38b9036c9c0509311c14bca96354f4f76365d3
Parents: 6f10a76
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jun 3 17:48:53 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jun 3 17:50:38 2013 +0200

----------------------------------------------------------------------
 .../component/direct/DirectBlockingProducer.java   |   97 +++++++++++++++
 .../camel/component/direct/DirectConsumer.java     |    5 +
 .../camel/component/direct/DirectEndpoint.java     |   41 ++++++-
 .../direct/DirectProducerBlockingTest.java         |   96 ++++++++++++++
 .../camel/impl/CustomExchangePatternTest.java      |   96 --------------
 5 files changed, 234 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/6c38b903/camel-core/src/main/java/org/apache/camel/component/direct/DirectBlockingProducer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/direct/DirectBlockingProducer.java b/camel-core/src/main/java/org/apache/camel/component/direct/DirectBlockingProducer.java
new file mode 100644
index 0000000..1b28bed
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/component/direct/DirectBlockingProducer.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.direct;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.util.StopWatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The direct producer.
+ * <p/>
+ * If blocking is enabled ({@code DirectEndpoint#isBlock}) then the DirectEndpoint will create an instance
+ * of this class instead of {@code DirectProducer}.
+ * This producers {@code process} method will block for the configured duration ({@code DirectEndpoint#getTimeout},
+ * default to 30 seconds). After which if a consumer is still unavailable a DirectConsumerNotAvailableException
+ * will be thrown.
+ * <p/>
+ * Implementation note: Concurrent Producers will block for the duration it takes to determine if a
+ * consumer is available, but actual consumer execution will happen concurrently.
+ */
+public class DirectBlockingProducer extends DefaultAsyncProducer {
+    private static final transient Logger LOG = LoggerFactory.getLogger(DirectBlockingProducer.class);
+    private final DirectEndpoint endpoint;
+
+    public DirectBlockingProducer(DirectEndpoint endpoint) {
+        super(endpoint);
+        this.endpoint = endpoint;
+    }
+
+    public void process(Exchange exchange) throws Exception {
+        getConsumer(exchange).getProcessor().process(exchange);
+    }
+
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        try {
+            return getConsumer(exchange).getAsyncProcessor().process(exchange, callback);
+        } catch (Exception e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
+        }
+    }
+
+   protected DirectConsumer getConsumer(Exchange exchange) throws Exception {
+        DirectConsumer answer = endpoint.getConsumer();
+        if (answer == null) {
+            // okay then await until we have a consumer or we timed out
+            answer = awaitConsumer();
+            if (answer == null) {
+                LOG.warn("No consumers available on endpoint: " + endpoint + " to process: " + exchange);
+                throw new DirectConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange);
+            }
+        }
+
+        return answer;
+    }
+
+    private DirectConsumer awaitConsumer() throws InterruptedException {
+        DirectConsumer answer = null;
+
+        StopWatch watch = new StopWatch();
+        boolean done = false;
+        while (!done) {
+            // sleep a bit to give chance for the consumer to be ready
+            Thread.sleep(500);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Waited {} for consumer to be ready", watch.taken());
+            }
+
+            answer = endpoint.getConsumer();
+            if (answer != null) {
+                return answer;
+            }
+            // we are done if we hit the timeout
+            done = watch.taken() >= endpoint.getTimeout();
+        }
+        return answer;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6c38b903/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java b/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
index f97128d..83dbbca 100644
--- a/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
@@ -38,6 +38,11 @@ public class DirectConsumer extends DefaultConsumer implements ShutdownAware, Su
     }
 
     @Override
+    public DirectEndpoint getEndpoint() {
+        return (DirectEndpoint) super.getEndpoint();
+    }
+
+    @Override
     protected void doStart() throws Exception {
         // add consumer to endpoint
         boolean existing = this == endpoint.getConsumer();

http://git-wip-us.apache.org/repos/asf/camel/blob/6c38b903/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
index 9936505..844a74b 100644
--- a/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
@@ -24,6 +24,7 @@ import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.util.ObjectHelper;
 
 /**
  * Represents a direct endpoint that synchronously invokes the consumer of the
@@ -34,6 +35,8 @@ import org.apache.camel.impl.DefaultEndpoint;
 public class DirectEndpoint extends DefaultEndpoint {
 
     private volatile Map<String, DirectConsumer> consumers;
+    private boolean block;
+    private long timeout = 30000L;
 
     public DirectEndpoint() {
         this.consumers = new HashMap<String, DirectConsumer>();
@@ -49,7 +52,11 @@ public class DirectEndpoint extends DefaultEndpoint {
     }
 
     public Producer createProducer() throws Exception {
-        return new DirectProducer(this);
+        if (block) {
+            return new DirectBlockingProducer(this);
+        } else {
+            return new DirectProducer(this);
+        }
     }
 
     public Consumer createConsumer(Processor processor) throws Exception {
@@ -63,23 +70,47 @@ public class DirectEndpoint extends DefaultEndpoint {
     }
 
     public void addConsumer(DirectConsumer consumer) {
-        String key = consumer.getEndpoint().getEndpointKey();
+        String key = consumer.getEndpoint().getKey();
         consumers.put(key, consumer);
     }
 
     public void removeConsumer(DirectConsumer consumer) {
-        String key = consumer.getEndpoint().getEndpointKey();
+        String key = consumer.getEndpoint().getKey();
         consumers.remove(key);
     }
 
     public boolean hasConsumer(DirectConsumer consumer) {
-        String key = consumer.getEndpoint().getEndpointKey();
+        String key = consumer.getEndpoint().getKey();
         return consumers.containsKey(key);
     }
 
     public DirectConsumer getConsumer() {
-        String key = getEndpointKey();
+        String key = getKey();
         return consumers.get(key);
     }
 
+    public boolean isBlock() {
+        return block;
+    }
+
+    public void setBlock(boolean block) {
+        this.block = block;
+    }
+
+    public long getTimeout() {
+        return timeout;
+    }
+
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+
+    protected String getKey() {
+        String uri = getEndpointUri();
+        if (uri.indexOf('?') != -1) {
+            return ObjectHelper.before(uri, "?");
+        } else {
+            return uri;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/6c38b903/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java b/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java
new file mode 100644
index 0000000..5322202
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.direct;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.util.StopWatch;
+
+public class DirectProducerBlockingTest extends ContextTestSupport {
+
+    public void testProducerBlocksForSuspendedConsumer() throws Exception {
+        DirectEndpoint endpoint = getMandatoryEndpoint("direct:suspended", DirectEndpoint.class);
+        endpoint.getConsumer().suspend();
+
+        StopWatch watch = new StopWatch();
+        try {
+            template.sendBody("direct:suspended?block=true&timeout=2000", "hello world");
+            fail("Expected CamelExecutionException");
+        } catch (CamelExecutionException e) {
+            DirectConsumerNotAvailableException cause = assertIsInstanceOf(DirectConsumerNotAvailableException.class, e.getCause());
+            assertIsInstanceOf(CamelExchangeException.class, cause);
+            assertTrue(watch.taken() > 1500);
+        }
+    }
+
+    public void testProducerBlocksWithNoConsumers() throws Exception {
+        DirectEndpoint endpoint = getMandatoryEndpoint("direct:suspended", DirectEndpoint.class);
+        endpoint.getConsumer().suspend();
+
+        StopWatch watch = new StopWatch();
+        try {
+            template.sendBody("direct:start?block=true&timeout=2000", "hello world");
+            fail("Expected CamelExecutionException");
+        } catch (CamelExecutionException e) {
+            DirectConsumerNotAvailableException cause = assertIsInstanceOf(DirectConsumerNotAvailableException.class, e.getCause());
+            assertIsInstanceOf(CamelExchangeException.class, cause);
+
+            assertTrue(watch.taken() > 1500);
+        }
+    }
+
+    public void testProducerBlocksResumeTest() throws Exception {
+        context.suspendRoute("foo");
+
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        executor.submit(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    Thread.sleep(2000);
+                    log.info("Resuming consumer");
+                    context.resumeRoute("foo");
+                } catch (Exception e) {
+                    // ignore
+                }
+            }
+        });
+
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+
+        template.sendBody("direct:suspended?block=true&timeout=5000", "hello world");
+
+        assertMockEndpointsSatisfied();
+
+        executor.shutdownNow();
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("direct:suspended").routeId("foo")
+                    .to("mock:result");
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6c38b903/camel-core/src/test/java/org/apache/camel/impl/CustomExchangePatternTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/CustomExchangePatternTest.java b/camel-core/src/test/java/org/apache/camel/impl/CustomExchangePatternTest.java
deleted file mode 100644
index cc942df..0000000
--- a/camel-core/src/test/java/org/apache/camel/impl/CustomExchangePatternTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.impl;
-
-import java.util.List;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.Processor;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-
-/**
- * @version 
- */
-public class CustomExchangePatternTest extends ContextTestSupport {
-    protected MockEndpoint resultEndpoint;
-
-    public void testInOut() throws Exception {
-        final ExchangePattern expectedPattern = ExchangePattern.InOut;
-
-        template.send("direct:start", expectedPattern, new Processor() {
-            public void process(Exchange exchange) throws Exception {
-                assertEquals("MEP", expectedPattern, exchange.getPattern());
-                exchange.getIn().setBody("<hello>world!</hello>");
-            }
-        });
-
-        resultEndpoint.assertIsSatisfied();
-        assertReceivedExpectedPattern(expectedPattern);
-    }
-
-    public void testInOnly() throws Exception {
-        ExchangePattern expectedPattern = ExchangePattern.InOnly;
-
-        template.send("direct:start", expectedPattern, new Processor() {
-            public void process(Exchange exchange) throws Exception {
-                exchange.getIn().setBody("<hello>world!</hello>");
-            }
-        });
-
-        resultEndpoint.assertIsSatisfied();
-        assertReceivedExpectedPattern(expectedPattern);
-    }
-
-    public void testInOutViaUri() throws Exception {
-        final ExchangePattern expectedPattern = ExchangePattern.InOut;
-
-        template.send("direct:start?exchangePattern=InOut", new Processor() {
-            public void process(Exchange exchange) throws Exception {
-                assertEquals("MEP", expectedPattern, exchange.getPattern());
-                exchange.getIn().setBody("<hello>world!</hello>");
-            }
-        });
-
-        resultEndpoint.assertIsSatisfied();
-        assertReceivedExpectedPattern(expectedPattern);
-    }
-
-    @Override
-    protected void setUp() throws Exception {
-        super.setUp();
-        resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(1);
-    }
-
-    protected void assertReceivedExpectedPattern(ExchangePattern expectedPattern) {
-        List<Exchange> list = resultEndpoint.getReceivedExchanges();
-        Exchange exchange = list.get(0);
-        assertEquals("MEP", expectedPattern, exchange.getPattern());
-    }
-
-    protected RouteBuilder createRouteBuilder() {
-        return new RouteBuilder() {
-            public void configure() {
-                from("direct:start").to("mock:result");
-                from("direct:start?exchangePattern=InOut").to("mock:result");
-            }
-        };
-    }
-}
\ No newline at end of file