You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2013/08/02 05:35:25 UTC

[1/3] git commit: CAMEL-6588 fixed CS errors

Updated Branches:
  refs/heads/master 785302319 -> e19acac50


CAMEL-6588 fixed CS errors


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e19acac5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e19acac5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e19acac5

Branch: refs/heads/master
Commit: e19acac5051a8bbbd043d5aec9e390838b765c68
Parents: 276aa87
Author: Willem Jiang <ni...@apache.org>
Authored: Fri Aug 2 10:57:03 2013 +0800
Committer: Willem Jiang <ni...@apache.org>
Committed: Fri Aug 2 11:33:33 2013 +0800

----------------------------------------------------------------------
 .../seda/ArrayBlockingQueueFactory.java         | 72 ++++++++++----------
 .../seda/PriorityBlockingQueueFactory.java      | 36 +++++-----
 .../camel/component/seda/SedaComponent.java     | 41 ++++++-----
 .../camel/component/seda/SedaEndpoint.java      | 18 ++---
 .../component/seda/SedaQueueFactoryTest.java    | 55 ++++++++-------
 .../camel/component/seda/SedaQueueTest.java     | 23 ++++---
 6 files changed, 125 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e19acac5/camel-core/src/main/java/org/apache/camel/component/seda/ArrayBlockingQueueFactory.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/ArrayBlockingQueueFactory.java b/camel-core/src/main/java/org/apache/camel/component/seda/ArrayBlockingQueueFactory.java
index 3983ff8..b075467 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/ArrayBlockingQueueFactory.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/ArrayBlockingQueueFactory.java
@@ -23,42 +23,43 @@ import java.util.concurrent.ArrayBlockingQueue;
  * Implementation of {@link BlockingQueueFactory} producing {@link java.util.concurrent.ArrayBlockingQueue}
  */
 public class ArrayBlockingQueueFactory<E> implements BlockingQueueFactory<E> {
-	/**
-	 * Capacity used when none provided
-	 */
-	private int defaultCapacity=50;
-	/**
-	 * Lock fairness. null means default fairness
-	 */
-	private Boolean fair;
-	/**
-	 * @return Default array capacity
-	 */
-	public int getDefaultCapacity() {
-		return defaultCapacity;
-	}
+    /**
+     * Capacity used when none provided
+     */
+    private int defaultCapacity = 50;
+    /**
+     * Lock fairness. null means default fairness
+     */
+    private Boolean fair;
 
-	/**
-	 * @param defaultCapacity Default array capacity
-	 */
-	public void setDefaultCapacity(int defaultCapacity) {
-		this.defaultCapacity = defaultCapacity;
-	}
+    /**
+     * @return Default array capacity
+     */
+    public int getDefaultCapacity() {
+        return defaultCapacity;
+    }
+
+    /**
+     * @param defaultCapacity Default array capacity
+     */
+    public void setDefaultCapacity(int defaultCapacity) {
+        this.defaultCapacity = defaultCapacity;
+    }
 
-	/**
-	 * @return Lock fairness
-	 */
-	public boolean isFair() {
-		return fair;
-	}
+    /**
+     * @return Lock fairness
+     */
+    public boolean isFair() {
+        return fair;
+    }
+
+    /**
+     * @param fair Lock fairness
+     */
+    public void setFair(boolean fair) {
+        this.fair = fair;
+    }
 
-	/**
-	 * @param fair Lock fairness
-	 */
-	public void setFair(boolean fair) {
-		this.fair = fair;
-	}
-	
     @Override
     public ArrayBlockingQueue<E> create() {
         return create(defaultCapacity);
@@ -66,8 +67,7 @@ public class ArrayBlockingQueueFactory<E> implements BlockingQueueFactory<E> {
 
     @Override
     public ArrayBlockingQueue<E> create(int capacity) {
-        return fair == null ? 
-				new ArrayBlockingQueue<E>(defaultCapacity) :
-				new ArrayBlockingQueue<E>(defaultCapacity, fair) ;
+        return fair == null
+            ? new ArrayBlockingQueue<E>(defaultCapacity) : new ArrayBlockingQueue<E>(defaultCapacity, fair);
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e19acac5/camel-core/src/main/java/org/apache/camel/component/seda/PriorityBlockingQueueFactory.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/PriorityBlockingQueueFactory.java b/camel-core/src/main/java/org/apache/camel/component/seda/PriorityBlockingQueueFactory.java
index da90d16..a1bd36f 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/PriorityBlockingQueueFactory.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/PriorityBlockingQueueFactory.java
@@ -24,31 +24,31 @@ import java.util.concurrent.PriorityBlockingQueue;
  * Implementation of {@link BlockingQueueFactory} producing {@link java.util.concurrent.PriorityBlockingQueue}
  */
 public class PriorityBlockingQueueFactory<E> implements BlockingQueueFactory<E> {
-	/**
-	 * Comparator used to sort exchanges
-	 */
-	private Comparator<E> comparator;
+    /**
+     * Comparator used to sort exchanges
+     */
+    private Comparator<E> comparator;
 
-	public Comparator<E> getComparator() {
-		return comparator;
-	}
+    public Comparator<E> getComparator() {
+        return comparator;
+    }
+
+    public void setComparator(Comparator<E> comparator) {
+        this.comparator = comparator;
+    }
 
-	public void setComparator(Comparator<E> comparator) {
-		this.comparator = comparator;
-	}
-	
     @Override
     public PriorityBlockingQueue<E> create() {
-        return comparator==null ?
-				new PriorityBlockingQueue<E>() :
-				// PriorityQueue as a default capacity of 11
-				new PriorityBlockingQueue<E>(11, comparator);
+        return comparator == null 
+            ? new PriorityBlockingQueue<E>()
+            // PriorityQueue as a default capacity of 11
+            : new PriorityBlockingQueue<E>(11, comparator);
     }
 
     @Override
     public PriorityBlockingQueue<E> create(int capacity) {
-        return comparator==null?
-				new PriorityBlockingQueue<E>(capacity):
-				new PriorityBlockingQueue<E>(capacity, comparator);
+        return comparator == null
+            ? new PriorityBlockingQueue<E>(capacity)
+            : new PriorityBlockingQueue<E>(capacity, comparator);
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e19acac5/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
index b13dd64..dd7f6a5 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
@@ -38,7 +38,7 @@ public class SedaComponent extends UriEndpointComponent {
     protected int queueSize;
     protected int defaultConcurrentConsumers = 1;
     private final Map<String, QueueReference> queues = new HashMap<String, QueueReference>();
-    private BlockingQueueFactory<Exchange> defaultQueueFactory =new LinkedBlockingQueueFactory<Exchange>();
+    private BlockingQueueFactory<Exchange> defaultQueueFactory = new LinkedBlockingQueueFactory<Exchange>();
     public SedaComponent() {
         super(SedaEndpoint.class);
     }
@@ -59,13 +59,13 @@ public class SedaComponent extends UriEndpointComponent {
         return defaultConcurrentConsumers;
     }
 
-	public BlockingQueueFactory<Exchange> getDefaultQueueFactory() {
-		return defaultQueueFactory;
-	}
+    public BlockingQueueFactory<Exchange> getDefaultQueueFactory() {
+        return defaultQueueFactory;
+    }
 
-	public void setDefaultQueueFactory(BlockingQueueFactory<Exchange> defaultQueueFactory) {
-		this.defaultQueueFactory = defaultQueueFactory;
-	}
+    public void setDefaultQueueFactory(BlockingQueueFactory<Exchange> defaultQueueFactory) {
+        this.defaultQueueFactory = defaultQueueFactory;
+    }
 
     /**
      * @deprecated use {@link #getOrCreateQueue(String, Integer, Boolean, BlockingQueueFactory)}
@@ -135,6 +135,7 @@ public class SedaComponent extends UriEndpointComponent {
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
         int consumers = getAndRemoveParameter(parameters, "concurrentConsumers", Integer.class, defaultConcurrentConsumers);
         boolean limitConcurrentConsumers = getAndRemoveParameter(parameters, "limitConcurrentConsumers", Boolean.class, true);
@@ -142,17 +143,21 @@ public class SedaComponent extends UriEndpointComponent {
             throw new IllegalArgumentException("The limitConcurrentConsumers flag in set to true. ConcurrentConsumers cannot be set at a value greater than "
                     + maxConcurrentConsumers + " was " + consumers);
         }
-		// Resolve queue reference
-		BlockingQueue<Exchange> queue=resolveAndRemoveReferenceParameter(parameters, "queue", BlockingQueue.class);
-		SedaEndpoint answer;
-		// Resolve queue factory when no queue specified
-		if (queue == null) {
-			BlockingQueueFactory<Exchange> queueFactory=resolveAndRemoveReferenceParameter(parameters, "queueFactory", BlockingQueueFactory.class);
-			// defer creating queue till endpoint is started, so we pass the queue factory
-			answer = new SedaEndpoint(uri, this, queueFactory, consumers);			
-		} else {
-			answer = new SedaEndpoint(uri, this, queue, consumers);
-		}
+        // Resolve queue reference
+        BlockingQueue<Exchange> queue = resolveAndRemoveReferenceParameter(parameters, "queue",
+                                                                           BlockingQueue.class);
+        SedaEndpoint answer;
+        // Resolve queue factory when no queue specified
+        if (queue == null) {
+            BlockingQueueFactory<Exchange> queueFactory = resolveAndRemoveReferenceParameter(parameters,
+                                                                                             "queueFactory",
+                                                                                             BlockingQueueFactory.class);
+            // defer creating queue till endpoint is started, so we pass the
+            // queue factory
+            answer = new SedaEndpoint(uri, this, queueFactory, consumers);
+        } else {
+            answer = new SedaEndpoint(uri, this, queue, consumers);
+        }
         answer.configureProperties(parameters);
         return answer;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/e19acac5/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
index d806d60..aa16c6b 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
@@ -81,7 +81,7 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
     private BlockingQueueFactory<Exchange> queueFactory;
 
     public SedaEndpoint() {
-		queueFactory = new LinkedBlockingQueueFactory<Exchange>();
+        queueFactory = new LinkedBlockingQueueFactory<Exchange>();
     }
 
     public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue) {
@@ -89,19 +89,19 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
     }
 
     public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue, int concurrentConsumers) {
-		this(endpointUri, component, concurrentConsumers);
+        this(endpointUri, component, concurrentConsumers);
         this.queue = queue;
         if (queue != null) {
             this.size = queue.remainingCapacity();
         }
-		queueFactory = new LinkedBlockingQueueFactory<Exchange>();
-	}
-	
+        queueFactory = new LinkedBlockingQueueFactory<Exchange>();
+    }
+
     public SedaEndpoint(String endpointUri, Component component, BlockingQueueFactory<Exchange> queueFactory, int concurrentConsumers) {
-		this(endpointUri, component, concurrentConsumers);
-		this.queueFactory = queueFactory;
-	}
-	
+        this(endpointUri, component, concurrentConsumers);
+        this.queueFactory = queueFactory;
+    }
+
     private SedaEndpoint(String endpointUri, Component component, int concurrentConsumers) {
         super(endpointUri, component);
         this.concurrentConsumers = concurrentConsumers;

http://git-wip-us.apache.org/repos/asf/camel/blob/e19acac5/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueFactoryTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueFactoryTest.java b/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueFactoryTest.java
index 033ce80..062df5a 100644
--- a/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueFactoryTest.java
+++ b/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueFactoryTest.java
@@ -1,9 +1,10 @@
-/*
- * Copyright 2013 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
  *      http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -15,14 +16,13 @@
  */
 package org.apache.camel.component.seda;
 
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ArrayBlockingQueue;
-import static junit.framework.TestCase.assertEquals;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
-import static org.apache.camel.TestSupport.assertIsInstanceOf;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.impl.SimpleRegistry;
 
@@ -30,37 +30,36 @@ import org.apache.camel.impl.SimpleRegistry;
  *
  */
 public class SedaQueueFactoryTest extends ContextTestSupport {
-	private final ArrayBlockingQueueFactory<Exchange> arrayQueueFactory=new ArrayBlockingQueueFactory<Exchange>();
+    private final ArrayBlockingQueueFactory<Exchange> arrayQueueFactory = new ArrayBlockingQueueFactory<Exchange>();
 
-	@Override
-	protected CamelContext createCamelContext() throws Exception {
-		SimpleRegistry simpleRegistry=new SimpleRegistry();
-		simpleRegistry.put("arrayQueueFactory", arrayQueueFactory);
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        SimpleRegistry simpleRegistry = new SimpleRegistry();
+        simpleRegistry.put("arrayQueueFactory", arrayQueueFactory);
         return new DefaultCamelContext(simpleRegistry);
-	}
-	
-    @SuppressWarnings("unchecked")
+    }
+
+   
     public void testArrayBlockingQueueFactory() throws Exception {
         SedaEndpoint endpoint = resolveMandatoryEndpoint("seda:arrayQueue?queueFactory=#arrayQueueFactory", SedaEndpoint.class);
-		
+
         BlockingQueue<Exchange> queue = endpoint.getQueue();
-        ArrayBlockingQueue<Exchange> blockingQueue = assertIsInstanceOf(ArrayBlockingQueue.class, queue);
-    }	
+        assertIsInstanceOf(ArrayBlockingQueue.class, queue);
+    }
 
-	@SuppressWarnings("unchecked")
+    @SuppressWarnings("unchecked")
     public void testArrayBlockingQueueFactoryAndSize() throws Exception {
         SedaEndpoint endpoint = resolveMandatoryEndpoint("seda:arrayQueue50?queueFactory=#arrayQueueFactory&size=50", SedaEndpoint.class);
-		
+
         BlockingQueue<Exchange> queue = endpoint.getQueue();
         ArrayBlockingQueue<Exchange> blockingQueue = assertIsInstanceOf(ArrayBlockingQueue.class, queue);
-		assertEquals("remainingCapacity", 50, blockingQueue.remainingCapacity());
-    }	
+        assertEquals("remainingCapacity", 50, blockingQueue.remainingCapacity());
+    }
 
-	@SuppressWarnings("unchecked")
+    
     public void testDefaultBlockingQueueFactory() throws Exception {
         SedaEndpoint endpoint = resolveMandatoryEndpoint("seda:linkedQueue", SedaEndpoint.class);
-		
         BlockingQueue<Exchange> queue = endpoint.getQueue();
-        LinkedBlockingQueue<Exchange> blockingQueue = assertIsInstanceOf(LinkedBlockingQueue.class, queue);
-    }	
+        assertIsInstanceOf(LinkedBlockingQueue.class, queue);
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e19acac5/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java b/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java
index 54ccdd5..bfdf99b 100644
--- a/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java
+++ b/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.seda;
 
 import java.util.concurrent.ArrayBlockingQueue;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
@@ -24,8 +25,6 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.impl.SimpleRegistry;
-import org.hamcrest.CoreMatchers;
-import org.junit.matchers.JUnitMatchers;
 
 /**
  * @version 
@@ -46,16 +45,18 @@ public class SedaQueueTest extends ContextTestSupport {
         mock.expectedBodiesReceived("Hello World");
 
         template.sendBody("seda:array?queue=#arrayQueue", "Hello World");
-		
-		SedaEndpoint sedaEndpoint=resolveMandatoryEndpoint("seda:array?queue=#arrayQueue", SedaEndpoint.class);
-		assertTrue(sedaEndpoint.getQueue() instanceof ArrayBlockingQueue);
+
+        SedaEndpoint sedaEndpoint = resolveMandatoryEndpoint("seda:array?queue=#arrayQueue",
+                                                             SedaEndpoint.class);
+        assertTrue(sedaEndpoint.getQueue() instanceof ArrayBlockingQueue);
     }
-	@Override
-	protected CamelContext createCamelContext() throws Exception {
-		SimpleRegistry simpleRegistry=new SimpleRegistry();
-		simpleRegistry.put("arrayQueue", new ArrayBlockingQueue<Exchange>(10));
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        SimpleRegistry simpleRegistry = new SimpleRegistry();
+        simpleRegistry.put("arrayQueue", new ArrayBlockingQueue<Exchange>(10));
         return new DefaultCamelContext(simpleRegistry);
-	}
+    }
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
@@ -66,7 +67,7 @@ public class SedaQueueTest extends ContextTestSupport {
 
                 from("seda:bar").to("mock:result");
 
-				from("seda:array?queue=#arrayQueue").to("mock:result");
+                from("seda:array?queue=#arrayQueue").to("mock:result");
             }
         };
     }


[2/3] git commit: CAMEL-6588 Choose BlockingQueue implementation in Seda component with thanks to Gérald

Posted by ni...@apache.org.
CAMEL-6588 Choose BlockingQueue implementation in Seda component with thanks to Gérald


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/276aa87e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/276aa87e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/276aa87e

Branch: refs/heads/master
Commit: 276aa87ec38bfd884a3dc52d8685d6be3922f553
Parents: f58ccf9
Author: Willem Jiang <ni...@apache.org>
Authored: Fri Aug 2 10:41:13 2013 +0800
Committer: Willem Jiang <ni...@apache.org>
Committed: Fri Aug 2 11:33:33 2013 +0800

----------------------------------------------------------------------
 .../seda/ArrayBlockingQueueFactory.java         | 73 ++++++++++++++++++++
 .../component/seda/BlockingQueueFactory.java    | 38 ++++++++++
 .../seda/LinkedBlockingQueueFactory.java        | 35 ++++++++++
 .../seda/PriorityBlockingQueueFactory.java      | 54 +++++++++++++++
 .../camel/component/seda/SedaComponent.java     | 40 ++++++++---
 .../camel/component/seda/SedaEndpoint.java      | 21 ++++--
 .../component/seda/SedaQueueFactoryTest.java    | 66 ++++++++++++++++++
 .../camel/component/seda/SedaQueueTest.java     | 24 +++++++
 8 files changed, 338 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/276aa87e/camel-core/src/main/java/org/apache/camel/component/seda/ArrayBlockingQueueFactory.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/ArrayBlockingQueueFactory.java b/camel-core/src/main/java/org/apache/camel/component/seda/ArrayBlockingQueueFactory.java
new file mode 100644
index 0000000..3983ff8
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/ArrayBlockingQueueFactory.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+/**
+ * Implementation of {@link BlockingQueueFactory} producing {@link java.util.concurrent.ArrayBlockingQueue}
+ */
+public class ArrayBlockingQueueFactory<E> implements BlockingQueueFactory<E> {
+	/**
+	 * Capacity used when none provided
+	 */
+	private int defaultCapacity=50;
+	/**
+	 * Lock fairness. null means default fairness
+	 */
+	private Boolean fair;
+	/**
+	 * @return Default array capacity
+	 */
+	public int getDefaultCapacity() {
+		return defaultCapacity;
+	}
+
+	/**
+	 * @param defaultCapacity Default array capacity
+	 */
+	public void setDefaultCapacity(int defaultCapacity) {
+		this.defaultCapacity = defaultCapacity;
+	}
+
+	/**
+	 * @return Lock fairness
+	 */
+	public boolean isFair() {
+		return fair;
+	}
+
+	/**
+	 * @param fair Lock fairness
+	 */
+	public void setFair(boolean fair) {
+		this.fair = fair;
+	}
+	
+    @Override
+    public ArrayBlockingQueue<E> create() {
+        return create(defaultCapacity);
+    }
+
+    @Override
+    public ArrayBlockingQueue<E> create(int capacity) {
+        return fair == null ? 
+				new ArrayBlockingQueue<E>(defaultCapacity) :
+				new ArrayBlockingQueue<E>(defaultCapacity, fair) ;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/276aa87e/camel-core/src/main/java/org/apache/camel/component/seda/BlockingQueueFactory.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/BlockingQueueFactory.java b/camel-core/src/main/java/org/apache/camel/component/seda/BlockingQueueFactory.java
new file mode 100644
index 0000000..0d69433
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/BlockingQueueFactory.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+
+import java.util.concurrent.BlockingQueue;
+import org.apache.camel.Exchange;
+
+/**
+ * Factory of {@link java.util.concurrent.BlockingQueue}
+ * @param <E> Element type, usually {@link Exchange}
+ */
+public interface BlockingQueueFactory<E> {
+    /**
+     * Create a new {@link java.util.concurrent.BlockingQueue} with default capacity
+     * @return New {@link java.util.concurrent.BlockingQueue}
+     */
+    BlockingQueue<E> create();
+    /**
+     * Create a new {@link java.util.concurrent.BlockingQueue} with given capacity
+     * @return New {@link java.util.concurrent.BlockingQueue}
+     */
+    BlockingQueue<E> create(int capacity);
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/276aa87e/camel-core/src/main/java/org/apache/camel/component/seda/LinkedBlockingQueueFactory.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/LinkedBlockingQueueFactory.java b/camel-core/src/main/java/org/apache/camel/component/seda/LinkedBlockingQueueFactory.java
new file mode 100644
index 0000000..096cd5b
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/LinkedBlockingQueueFactory.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Implementation of {@link BlockingQueueFactory} producing {@link java.util.concurrent.LinkedBlockingQueue}
+ */
+public class LinkedBlockingQueueFactory<E> implements BlockingQueueFactory<E> {
+    @Override
+    public LinkedBlockingQueue<E> create() {
+        return new LinkedBlockingQueue<E>();
+    }
+
+    @Override
+    public LinkedBlockingQueue<E> create(int capacity) {
+        return new LinkedBlockingQueue<E>(capacity);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/276aa87e/camel-core/src/main/java/org/apache/camel/component/seda/PriorityBlockingQueueFactory.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/PriorityBlockingQueueFactory.java b/camel-core/src/main/java/org/apache/camel/component/seda/PriorityBlockingQueueFactory.java
new file mode 100644
index 0000000..da90d16
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/PriorityBlockingQueueFactory.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+import java.util.Comparator;
+
+import java.util.concurrent.PriorityBlockingQueue;
+
+/**
+ * Implementation of {@link BlockingQueueFactory} producing {@link java.util.concurrent.PriorityBlockingQueue}
+ */
+public class PriorityBlockingQueueFactory<E> implements BlockingQueueFactory<E> {
+	/**
+	 * Comparator used to sort exchanges
+	 */
+	private Comparator<E> comparator;
+
+	public Comparator<E> getComparator() {
+		return comparator;
+	}
+
+	public void setComparator(Comparator<E> comparator) {
+		this.comparator = comparator;
+	}
+	
+    @Override
+    public PriorityBlockingQueue<E> create() {
+        return comparator==null ?
+				new PriorityBlockingQueue<E>() :
+				// PriorityQueue as a default capacity of 11
+				new PriorityBlockingQueue<E>(11, comparator);
+    }
+
+    @Override
+    public PriorityBlockingQueue<E> create(int capacity) {
+        return comparator==null?
+				new PriorityBlockingQueue<E>(capacity):
+				new PriorityBlockingQueue<E>(capacity, comparator);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/276aa87e/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
index e6d5171..b13dd64 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
@@ -19,7 +19,6 @@ package org.apache.camel.component.seda;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -39,7 +38,7 @@ public class SedaComponent extends UriEndpointComponent {
     protected int queueSize;
     protected int defaultConcurrentConsumers = 1;
     private final Map<String, QueueReference> queues = new HashMap<String, QueueReference>();
-
+    private BlockingQueueFactory<Exchange> defaultQueueFactory =new LinkedBlockingQueueFactory<Exchange>();
     public SedaComponent() {
         super(SedaEndpoint.class);
     }
@@ -60,15 +59,30 @@ public class SedaComponent extends UriEndpointComponent {
         return defaultConcurrentConsumers;
     }
 
+	public BlockingQueueFactory<Exchange> getDefaultQueueFactory() {
+		return defaultQueueFactory;
+	}
+
+	public void setDefaultQueueFactory(BlockingQueueFactory<Exchange> defaultQueueFactory) {
+		this.defaultQueueFactory = defaultQueueFactory;
+	}
+
     /**
-     * @deprecated use {@link #getOrCreateQueue(String, Integer, Boolean)}
+     * @deprecated use {@link #getOrCreateQueue(String, Integer, Boolean, BlockingQueueFactory)}
      */
     @Deprecated
     public synchronized QueueReference getOrCreateQueue(String uri, Integer size) {
         return getOrCreateQueue(uri, size, null);
     }
 
+    /**
+     * @deprecated use {@link #getOrCreateQueue(String, Integer, Boolean, BlockingQueueFactory)}
+     */
     public synchronized QueueReference getOrCreateQueue(String uri, Integer size, Boolean multipleConsumers) {
+        return getOrCreateQueue(uri, size, multipleConsumers, null);
+    }
+
+    public synchronized QueueReference getOrCreateQueue(String uri, Integer size, Boolean multipleConsumers, BlockingQueueFactory customQueueFactory) {
         String key = getQueueKey(uri);
 
         QueueReference ref = getQueues().get(key);
@@ -91,14 +105,15 @@ public class SedaComponent extends UriEndpointComponent {
 
         // create queue
         BlockingQueue<Exchange> queue;
+        BlockingQueueFactory<Exchange> queueFactory = customQueueFactory == null ? defaultQueueFactory : customQueueFactory;
         if (size != null && size > 0) {
-            queue = new LinkedBlockingQueue<Exchange>(size);
+            queue = queueFactory.create(size);
         } else {
             if (getQueueSize() > 0) {
                 size = getQueueSize();
-                queue = new LinkedBlockingQueue<Exchange>(getQueueSize());
+                queue = queueFactory.create(getQueueSize());
             } else {
-                queue = new LinkedBlockingQueue<Exchange>();
+                queue = queueFactory.create();
             }
         }
         log.debug("Created queue {} with size {}", key, size);
@@ -127,8 +142,17 @@ public class SedaComponent extends UriEndpointComponent {
             throw new IllegalArgumentException("The limitConcurrentConsumers flag in set to true. ConcurrentConsumers cannot be set at a value greater than "
                     + maxConcurrentConsumers + " was " + consumers);
         }
-        // defer creating queue till endpoint is started, so we pass in null
-        SedaEndpoint answer = new SedaEndpoint(uri, this, null, consumers);
+		// Resolve queue reference
+		BlockingQueue<Exchange> queue=resolveAndRemoveReferenceParameter(parameters, "queue", BlockingQueue.class);
+		SedaEndpoint answer;
+		// Resolve queue factory when no queue specified
+		if (queue == null) {
+			BlockingQueueFactory<Exchange> queueFactory=resolveAndRemoveReferenceParameter(parameters, "queueFactory", BlockingQueueFactory.class);
+			// defer creating queue till endpoint is started, so we pass the queue factory
+			answer = new SedaEndpoint(uri, this, queueFactory, consumers);			
+		} else {
+			answer = new SedaEndpoint(uri, this, queue, consumers);
+		}
         answer.configureProperties(parameters);
         return answer;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/276aa87e/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
index 656736c..d806d60 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
@@ -23,7 +23,6 @@ import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
@@ -79,8 +78,10 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
     private int pollTimeout = 1000;
     @UriParam
     private boolean purgeWhenStopping;
+    private BlockingQueueFactory<Exchange> queueFactory;
 
     public SedaEndpoint() {
+		queueFactory = new LinkedBlockingQueueFactory<Exchange>();
     }
 
     public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue) {
@@ -88,11 +89,21 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
     }
 
     public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue, int concurrentConsumers) {
-        super(endpointUri, component);
+		this(endpointUri, component, concurrentConsumers);
         this.queue = queue;
         if (queue != null) {
             this.size = queue.remainingCapacity();
         }
+		queueFactory = new LinkedBlockingQueueFactory<Exchange>();
+	}
+	
+    public SedaEndpoint(String endpointUri, Component component, BlockingQueueFactory<Exchange> queueFactory, int concurrentConsumers) {
+		this(endpointUri, component, concurrentConsumers);
+		this.queueFactory = queueFactory;
+	}
+	
+    private SedaEndpoint(String endpointUri, Component component, int concurrentConsumers) {
+        super(endpointUri, component);
         this.concurrentConsumers = concurrentConsumers;
     }
 
@@ -130,7 +141,7 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
             if (getComponent() != null) {
                 // use null to indicate default size (= use what the existing queue has been configured with)
                 Integer size = getSize() == Integer.MAX_VALUE ? null : getSize();
-                SedaComponent.QueueReference ref = getComponent().getOrCreateQueue(getEndpointUri(), size, isMultipleConsumers());
+                SedaComponent.QueueReference ref = getComponent().getOrCreateQueue(getEndpointUri(), size, isMultipleConsumers(), queueFactory);
                 queue = ref.getQueue();
                 String key = getComponent().getQueueKey(getEndpointUri());
                 LOG.info("Endpoint {} is using shared queue: {} with size: {}", new Object[]{this, key, ref.getSize() !=  null ? ref.getSize() : Integer.MAX_VALUE});
@@ -149,9 +160,9 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
 
     protected BlockingQueue<Exchange> createQueue() {
         if (size > 0) {
-            return new LinkedBlockingQueue<Exchange>(size);
+            return queueFactory.create(size);
         } else {
-            return new LinkedBlockingQueue<Exchange>();
+            return queueFactory.create();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/276aa87e/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueFactoryTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueFactoryTest.java b/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueFactoryTest.java
new file mode 100644
index 0000000..033ce80
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueFactoryTest.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2013 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ArrayBlockingQueue;
+import static junit.framework.TestCase.assertEquals;
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import static org.apache.camel.TestSupport.assertIsInstanceOf;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
+
+/**
+ *
+ */
+public class SedaQueueFactoryTest extends ContextTestSupport {
+	private final ArrayBlockingQueueFactory<Exchange> arrayQueueFactory=new ArrayBlockingQueueFactory<Exchange>();
+
+	@Override
+	protected CamelContext createCamelContext() throws Exception {
+		SimpleRegistry simpleRegistry=new SimpleRegistry();
+		simpleRegistry.put("arrayQueueFactory", arrayQueueFactory);
+        return new DefaultCamelContext(simpleRegistry);
+	}
+	
+    @SuppressWarnings("unchecked")
+    public void testArrayBlockingQueueFactory() throws Exception {
+        SedaEndpoint endpoint = resolveMandatoryEndpoint("seda:arrayQueue?queueFactory=#arrayQueueFactory", SedaEndpoint.class);
+		
+        BlockingQueue<Exchange> queue = endpoint.getQueue();
+        ArrayBlockingQueue<Exchange> blockingQueue = assertIsInstanceOf(ArrayBlockingQueue.class, queue);
+    }	
+
+	@SuppressWarnings("unchecked")
+    public void testArrayBlockingQueueFactoryAndSize() throws Exception {
+        SedaEndpoint endpoint = resolveMandatoryEndpoint("seda:arrayQueue50?queueFactory=#arrayQueueFactory&size=50", SedaEndpoint.class);
+		
+        BlockingQueue<Exchange> queue = endpoint.getQueue();
+        ArrayBlockingQueue<Exchange> blockingQueue = assertIsInstanceOf(ArrayBlockingQueue.class, queue);
+		assertEquals("remainingCapacity", 50, blockingQueue.remainingCapacity());
+    }	
+
+	@SuppressWarnings("unchecked")
+    public void testDefaultBlockingQueueFactory() throws Exception {
+        SedaEndpoint endpoint = resolveMandatoryEndpoint("seda:linkedQueue", SedaEndpoint.class);
+		
+        BlockingQueue<Exchange> queue = endpoint.getQueue();
+        LinkedBlockingQueue<Exchange> blockingQueue = assertIsInstanceOf(LinkedBlockingQueue.class, queue);
+    }	
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/276aa87e/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java b/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java
index bf30cdc..54ccdd5 100644
--- a/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java
+++ b/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java
@@ -16,9 +16,16 @@
  */
 package org.apache.camel.component.seda;
 
+import java.util.concurrent.ArrayBlockingQueue;
+import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
+import org.hamcrest.CoreMatchers;
+import org.junit.matchers.JUnitMatchers;
 
 /**
  * @version 
@@ -34,6 +41,21 @@ public class SedaQueueTest extends ContextTestSupport {
         template.sendBody("seda:foo?concurrentConsumers=5", "Goodday World");
         template.sendBody("seda:bar", "Bar");
     }
+    public void testQueueRef() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+
+        template.sendBody("seda:array?queue=#arrayQueue", "Hello World");
+		
+		SedaEndpoint sedaEndpoint=resolveMandatoryEndpoint("seda:array?queue=#arrayQueue", SedaEndpoint.class);
+		assertTrue(sedaEndpoint.getQueue() instanceof ArrayBlockingQueue);
+    }
+	@Override
+	protected CamelContext createCamelContext() throws Exception {
+		SimpleRegistry simpleRegistry=new SimpleRegistry();
+		simpleRegistry.put("arrayQueue", new ArrayBlockingQueue<Exchange>(10));
+        return new DefaultCamelContext(simpleRegistry);
+	}
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
@@ -43,6 +65,8 @@ public class SedaQueueTest extends ContextTestSupport {
                 from("seda:foo?size=20&concurrentConsumers=2").to("mock:result");
 
                 from("seda:bar").to("mock:result");
+
+				from("seda:array?queue=#arrayQueue").to("mock:result");
             }
         };
     }


Re: [3/3] git commit: CAMEL-6597 Camel conduit should support the JAXWS Async API out of box

Posted by Daniel Kulp <dk...@apache.org>.
On Aug 1, 2013, at 11:35 PM, ningjiang@apache.org wrote:

> +            } else {
> +                WorkQueueManager mgr = outMessage.getExchange().get(Bus.class)
> +                    .getExtension(WorkQueueManager.class);
> +                AutomaticWorkQueue qu = mgr.getNamedWorkQueue("nmr-conduit");
> +                if (qu == null) {
> +                    qu = mgr.getAutomaticWorkQueue();
> +                }
> +                // need to set the time out somewhere
> +                qu.execute(runnable);
> +            } 

Why "nmr-conduit"?    Can we just use "camel-cxf-conduit" or something that doesn't have JBI-isms in it?


-- 
Daniel Kulp
dkulp@apache.org - http://dankulp.com/blog
Talend Community Coder - http://coders.talend.com


[3/3] git commit: CAMEL-6597 Camel conduit should support the JAXWS Async API out of box

Posted by ni...@apache.org.
CAMEL-6597 Camel conduit should support the JAXWS Async API out of box


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f58ccf9b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f58ccf9b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f58ccf9b

Branch: refs/heads/master
Commit: f58ccf9bd5261549dad2f1c13ab35339fa715c72
Parents: 7853023
Author: Willem Jiang <ni...@apache.org>
Authored: Fri Aug 2 10:35:28 2013 +0800
Committer: Willem Jiang <ni...@apache.org>
Committed: Fri Aug 2 11:33:33 2013 +0800

----------------------------------------------------------------------
 .../cxf/transport/CamelOutputStream.java        | 94 +++++++++++++++++---
 .../cxf/transport/JaxWSCamelConduitTest.java    | 14 +++
 .../cxf/transport/JaxWSCamelTestSupport.java    | 31 ++++++-
 3 files changed, 125 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/f58ccf9b/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelOutputStream.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelOutputStream.java b/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelOutputStream.java
index bd1cf62..6cc1bb1 100644
--- a/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelOutputStream.java
+++ b/components/camel-cxf-transport/src/main/java/org/apache/camel/component/cxf/transport/CamelOutputStream.java
@@ -18,16 +18,24 @@ package org.apache.camel.component.cxf.transport;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Producer;
 import org.apache.camel.component.cxf.common.header.CxfHeaderHelper;
 import org.apache.camel.component.cxf.common.message.CxfMessageHelper;
 import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.cxf.Bus;
 import org.apache.cxf.io.CachedOutputStream;
 import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.PhaseInterceptorChain;
 import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.workqueue.AutomaticWorkQueue;
+import org.apache.cxf.workqueue.WorkQueueManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,12 +45,13 @@ class CamelOutputStream extends CachedOutputStream {
     /**
      * 
      */
-    private Message outMessage;
+    private final Message outMessage;
     private boolean isOneWay;
     private String targetCamelEndpointUri;
     private Producer producer;
     private HeaderFilterStrategy headerFilterStrategy;
     private MessageObserver observer;
+    private boolean hasLoggedAsyncWarning;
 
     public CamelOutputStream(String targetCamelEndpointUri, Producer producer, 
                              HeaderFilterStrategy headerFilterStrategy, MessageObserver observer, 
@@ -60,6 +69,7 @@ class CamelOutputStream extends CachedOutputStream {
 
     protected void doClose() throws IOException {
         isOneWay = outMessage.getExchange().isOneWay();
+        
         commitOutputMessage();
     }
 
@@ -76,7 +86,7 @@ class CamelOutputStream extends CachedOutputStream {
             pattern = ExchangePattern.InOut;
         }
         LOG.debug("send the message to endpoint {}", this.targetCamelEndpointUri);
-        org.apache.camel.Exchange exchange = this.producer.createExchange(pattern);
+        final org.apache.camel.Exchange exchange = this.producer.createExchange(pattern);
 
         exchange.setProperty(Exchange.TO_ENDPOINT, this.targetCamelEndpointUri);
         CachedOutputStream outputStream = (CachedOutputStream) outMessage.getContent(OutputStream.class);
@@ -86,31 +96,89 @@ class CamelOutputStream extends CachedOutputStream {
         // TODO support different encoding
         exchange.getIn().setBody(outputStream.getInputStream());
         LOG.debug("template sending request: ", exchange.getIn());
-        Exception exception;
+        
+        if (outMessage.getExchange().isSynchronous()) {
+            syncInvoke(exchange);
+        } else {
+            // submit the request to the work queue
+            asyncInvokeFromWorkQueue(exchange);
+        }
+
+    }
+    
+    protected void syncInvoke(org.apache.camel.Exchange exchange) throws IOException {
         try {
             this.producer.process(exchange);
         } catch (Exception ex) {
-            exception = ex;
+            exchange.setException(ex);
         }
         // Throw the exception that the template get
-        exception = exchange.getException();
+        Exception exception = exchange.getException();
         if (exception != null) {
             throw new IOException("Cannot send the request message.", exchange.getException());
         }
         exchange.setProperty(CamelTransportConstants.CXF_EXCHANGE, outMessage.getExchange());
         if (!isOneWay) {
-            handleResponse(exchange);
+            handleResponseInternal(exchange);
         }
-
+        
     }
-
-    private void handleResponse(org.apache.camel.Exchange exchange) throws IOException {
-        org.apache.cxf.message.Message inMessage = null;
+     
+    protected void asyncInvokeFromWorkQueue(final org.apache.camel.Exchange exchange) throws IOException {
+        Runnable runnable = new Runnable() {
+            public void run() {
+                try {
+                    syncInvoke(exchange);
+                } catch (Throwable e) {
+                    ((PhaseInterceptorChain)outMessage.getInterceptorChain()).abort();
+                    outMessage.setContent(Exception.class, e);
+                    ((PhaseInterceptorChain)outMessage.getInterceptorChain()).unwind(outMessage);
+                    MessageObserver mo = outMessage.getInterceptorChain().getFaultObserver();
+                    if (mo == null) {
+                        mo = outMessage.getExchange().get(MessageObserver.class);
+                    }
+                    mo.onMessage(outMessage);
+                }
+            }
+        };
+        
         try {
-            inMessage = CxfMessageHelper.getCxfInMessage(this.headerFilterStrategy, exchange, true);
-        } catch (Exception ex) {
-            throw new IOException("Cannot get the response message. ", ex);
+            Executor ex = outMessage.getExchange().get(Executor.class);
+            if (ex != null) {
+                final Executor ex2 = ex;
+                final Runnable origRunnable = runnable;
+                runnable = new Runnable() {
+                    public void run() {
+                        outMessage.getExchange().put(Executor.class.getName() 
+                                                     + ".USING_SPECIFIED", Boolean.TRUE);
+                        ex2.execute(origRunnable);
+                    }
+                };
+            } else {
+                WorkQueueManager mgr = outMessage.getExchange().get(Bus.class)
+                    .getExtension(WorkQueueManager.class);
+                AutomaticWorkQueue qu = mgr.getNamedWorkQueue("nmr-conduit");
+                if (qu == null) {
+                    qu = mgr.getAutomaticWorkQueue();
+                }
+                // need to set the time out somewhere
+                qu.execute(runnable);
+            } 
+        } catch (RejectedExecutionException rex) {
+            if (!hasLoggedAsyncWarning) {
+                LOG.warn("Executor rejected background task to retrieve the response.  Suggest increasing the workqueue settings.");
+                hasLoggedAsyncWarning = true;
+            }
+            LOG.info("Executor rejected background task to retrieve the response, running on current thread.");
+            syncInvoke(exchange);
         }
+    }
+
+    private void handleResponseInternal(org.apache.camel.Exchange exchange) {
+        org.apache.cxf.message.Message inMessage = null;
+        inMessage = CxfMessageHelper.getCxfInMessage(this.headerFilterStrategy, exchange, true);
         this.observer.onMessage(inMessage);
     }
+    
+    
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/f58ccf9b/components/camel-cxf-transport/src/test/java/org/apache/camel/component/cxf/transport/JaxWSCamelConduitTest.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf-transport/src/test/java/org/apache/camel/component/cxf/transport/JaxWSCamelConduitTest.java b/components/camel-cxf-transport/src/test/java/org/apache/camel/component/cxf/transport/JaxWSCamelConduitTest.java
index 5f3922b..788fbed 100644
--- a/components/camel-cxf-transport/src/test/java/org/apache/camel/component/cxf/transport/JaxWSCamelConduitTest.java
+++ b/components/camel-cxf-transport/src/test/java/org/apache/camel/component/cxf/transport/JaxWSCamelConduitTest.java
@@ -16,10 +16,14 @@
  */
 package org.apache.camel.component.cxf.transport;
 
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.Test;
+
 import static org.hamcrest.CoreMatchers.is;
 
 /**
@@ -68,4 +72,14 @@ public class JaxWSCamelConduitTest extends JaxWSCamelTestSupport {
     public void testStart3() {
         assertThat(getSampleWS("direct:start3").getSomething(), is("Something"));
     }
+    
+    @Test
+    public void testAsyncInvocation() throws InterruptedException, ExecutionException {
+        
+        Future<?> result = getSampleWSAsyncWithCXFAPI("direct:start2").getSomethingAsync();
+        // as the CXF will build the getSomethingResponse by using asm, so we cannot get the response directly.
+        assertNotNull(result.get());
+        
+       
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/f58ccf9b/components/camel-cxf-transport/src/test/java/org/apache/camel/component/cxf/transport/JaxWSCamelTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf-transport/src/test/java/org/apache/camel/component/cxf/transport/JaxWSCamelTestSupport.java b/components/camel-cxf-transport/src/test/java/org/apache/camel/component/cxf/transport/JaxWSCamelTestSupport.java
index f1ac596..3fe416a 100644
--- a/components/camel-cxf-transport/src/test/java/org/apache/camel/component/cxf/transport/JaxWSCamelTestSupport.java
+++ b/components/camel-cxf-transport/src/test/java/org/apache/camel/component/cxf/transport/JaxWSCamelTestSupport.java
@@ -16,16 +16,22 @@
  */
 package org.apache.camel.component.cxf.transport;
 
+import java.util.concurrent.Future;
+
 import javax.jws.WebMethod;
+import javax.jws.WebParam;
 import javax.jws.WebResult;
 import javax.jws.WebService;
 import javax.xml.namespace.QName;
+import javax.xml.ws.AsyncHandler;
 import javax.xml.ws.Endpoint;
+import javax.xml.ws.Response;
 import javax.xml.ws.Service;
 
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.apache.cxf.Bus;
 import org.apache.cxf.BusFactory;
+import org.apache.cxf.feature.Feature;
 import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
 import org.junit.Before;
 
@@ -51,9 +57,24 @@ public class JaxWSCamelTestSupport extends CamelTestSupport {
     @WebService(targetNamespace = "urn:test", serviceName = "testService", portName = "testPort")
     public interface SampleWS {
 
-        @WebMethod
+        @WebMethod(operationName = "getSomething")
         @WebResult(name = "result", targetNamespace = "urn:test")
         String getSomething();
+       
+    }
+    
+    @WebService(targetNamespace = "urn:test", serviceName = "testService", portName = "testPort")
+    public interface SampleWSAsync {
+        @WebMethod(operationName = "getSomething")
+        @WebResult(name = "result", targetNamespace = "urn:test")
+        String getSomething();
+        
+        @WebMethod(operationName = "getSomething")
+        Response<String> getSomethingAsync();
+        
+        @WebMethod(operationName = "getSomething")
+        Future<?> getSomethingAsync(@WebParam(name = "asyncHandler", targetNamespace = "")
+            AsyncHandler<String> asyncHandler);
     }
     
     public static class SampleWSImpl implements SampleWS {
@@ -99,6 +120,14 @@ public class JaxWSCamelTestSupport extends CamelTestSupport {
         return factory.create(SampleWS.class);
     }
     
+    public SampleWSAsync getSampleWSAsyncWithCXFAPI(String camelEndpoint) {
+        JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
+        factory.setAddress("camel://" + camelEndpoint);
+        factory.setServiceClass(SampleWSAsync.class);
+        factory.setBus(bus);
+        return factory.create(SampleWSAsync.class);
+    }
+    
     /**
      * Create a SampleWS Server to a specified route
      * @param camelEndpoint