You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by dm...@apache.org on 2018/11/30 10:19:14 UTC

[camel] 01/05: CAMEL-12964: Initial import of jBPM CamelWIH and Command.

This is an automated email from the ASF dual-hosted git repository.

dmvolod pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 128f84189a366db43a47461353c2ccaa8d8f658c
Author: Duncan Doyle <Du...@gmail.com>
AuthorDate: Wed Nov 28 13:50:22 2018 +0100

    CAMEL-12964: Initial import of jBPM CamelWIH and Command.
---
 components/camel-jbpm/pom.xml                      |  27 ++++
 .../component/jbpm/workitem/CamelCommand.java      | 121 ++++++++++++++++++
 .../jbpm/workitem/CamelWorkItemHandler.java        | 137 +++++++++++++++++++++
 .../component/jbpm/workitem/CamelCommandTest.java  | 134 ++++++++++++++++++++
 .../jbpm/workitem/CamelWorkItemHandlerTest.java    | 137 +++++++++++++++++++++
 5 files changed, 556 insertions(+)

diff --git a/components/camel-jbpm/pom.xml b/components/camel-jbpm/pom.xml
index a0536f0a..7927c7c 100644
--- a/components/camel-jbpm/pom.xml
+++ b/components/camel-jbpm/pom.xml
@@ -92,6 +92,15 @@
             <optional>true</optional>
         </dependency>
         
+        <!-- jBPM WorkItems -->
+        <dependency>
+        	<groupId>org.jbpm</groupId>
+        	<artifactId>jbpm-workitems-core</artifactId>
+        	<version>${jbpm-version}</version>
+        	<scope>provided</scope>
+        	<optional>true</optional>
+        </dependency>
+        
         <dependency>
             <groupId>org.jboss.logging</groupId>
             <artifactId>jboss-logging</artifactId>
@@ -128,6 +137,24 @@
             <artifactId>camel-test</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+        	<groupId>org.mockito</groupId>
+        	<artifactId>mockito-core</artifactId>
+        	<scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.jbpm</groupId>
+            <artifactId>jbpm-workitems-core</artifactId>
+            <version>${jbpm-version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+            <exclusions>
+               <exclusion>
+                   <groupId>xml-apis</groupId>
+                   <artifactId>xml-apis</artifactId>
+               </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/CamelCommand.java b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/CamelCommand.java
new file mode 100644
index 0000000..80c767e
--- /dev/null
+++ b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/CamelCommand.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.jbpm.workitem;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.ExchangeBuilder;
+import org.jbpm.services.api.service.ServiceRegistry;
+import org.kie.api.executor.Command;
+import org.kie.api.executor.CommandContext;
+import org.kie.api.executor.ExecutionResults;
+import org.kie.api.runtime.manager.RuntimeManager;
+import org.kie.api.runtime.process.WorkItem;
+import org.kie.internal.runtime.Cacheable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Camel jBPM {@link Command} which allows to call Camel routes with a <code>direct</code> endpoint.
+ * <p/>
+ * The command passes the {@WorkItem} retrieved from the {@link CommandContext} to the route that has a consumer on the endpoint-id 
+ * that can be passed with the <code>camel-endpoint-id</code> {@link WorkItem} parameter. E.g. when a the value "myCamelEndpoint" is passed to the 
+ * {link WorkItem} via the <code>camel-endpoint-id</code> parameter, this {@link Command} will send the {@link WorkItem} to 
+ * the Camel URI <code>direct://myCamelEndpoint</code>.  
+ * <p/>
+ * The body of the result {@link Message} of the invocation is returned via the <code>response</code> parameter. Access to the raw response 
+ * {@link Message} is provided via the <code>message</code> parameter. This gives the user access to more advanced fields like message headers 
+ * and attachments.  
+ * <p/>
+ * This {@link Command} can be constructed in 2 ways. When using the default constructor, the {link Command} will try to find 
+ * the global KIE {@link CamelContext} from the <code>jBPM</code> {@link ServiceRegistry}. 
+ * When the {@link RuntimeManager} is passed to the constructor, the {@link Command} will retrieve and use the {@link CamelContext} bound 
+ * to the {@link RuntimeManage} from the {@link ServiceRegistry}
+ * 
+ */
+public class CamelCommand implements Command,
+                                          Cacheable {
+
+	private static final String GLOBAL_CAMEL_CONTEXT_SERVICE_KEY = "GlobalCamelService";
+	private static final String RUNTIME_CAMEL_CONTEXT_SERVICE_POSTFIX = "_CamelService";
+	
+	private static final String CAMEL_ENDPOINT_ID_PARAM = "camel-endpoint-id";
+	private static final String RESPONSE_PARAM = "response";
+	private static final String MESSAGE_PARAM = "out-headers";
+
+	
+    private static final Logger logger = LoggerFactory.getLogger(CamelCommand.class);
+
+	private final ProducerTemplate producerTemplate;
+
+
+	public CamelCommand() {
+		CamelContext globalCamelContext = (CamelContext) ServiceRegistry.get().service(GLOBAL_CAMEL_CONTEXT_SERVICE_KEY);
+		// TODO: Should we allow to set the maximumCacheSize on the producer?
+		this.producerTemplate = globalCamelContext.createProducerTemplate();
+	}
+	
+	public CamelCommand(RuntimeManager runtimeManager) {
+		String runtimeCamelContextKey = runtimeManager.getIdentifier() + RUNTIME_CAMEL_CONTEXT_SERVICE_POSTFIX;
+		CamelContext runtimeCamelContext = (CamelContext) ServiceRegistry.get().service(runtimeCamelContextKey);
+		// TODO: Should we allow to set the maximumCacheSize on the producer?
+		this.producerTemplate = runtimeCamelContext.createProducerTemplate();
+	}
+	
+	
+	
+    @Override
+    public ExecutionResults execute(CommandContext ctx) throws Exception {
+        
+    	WorkItem workItem = (WorkItem) ctx.getData("workItem");
+    	
+    	String camelEndpointId = (String) workItem.getParameter(CAMEL_ENDPOINT_ID_PARAM);
+		
+		// We only support direct. We don't need to support more, as direct simply gives us the entrypoint into the actual Camel Routes.
+		String camelUri = "direct://" + camelEndpointId;
+		
+		Exchange inExchange = ExchangeBuilder.anExchange(producerTemplate.getCamelContext()).withBody(workItem).build();
+		Exchange outExchange = producerTemplate.send(camelUri, inExchange);
+		// producerTemplate.send does not throw exceptions, instead they are set on the returned Exchange.
+		if (outExchange.getException() != null) {
+			throw outExchange.getException();
+		}
+		Message outMessage = outExchange.getOut();
+		
+		ExecutionResults results = new ExecutionResults();
+		Object response = outMessage.getBody();
+		results.setData(RESPONSE_PARAM, response);
+		results.setData(MESSAGE_PARAM, outMessage);
+    	
+        return results;
+    }
+
+    
+	@Override
+	public void close() {
+		try {
+			this.producerTemplate.stop();
+		} catch (Exception e) {
+			logger.warn("Error encountered while closing the Camel Producer Template.", e);
+			// Not much we can do here, so swallowing exception.
+		}
+	}
+
+}
\ No newline at end of file
diff --git a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/CamelWorkItemHandler.java b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/CamelWorkItemHandler.java
new file mode 100644
index 0000000..7734dd9
--- /dev/null
+++ b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/CamelWorkItemHandler.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.jbpm.workitem;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.ExchangeBuilder;
+import org.jbpm.process.workitem.core.AbstractLogOrThrowWorkItemHandler;
+import org.jbpm.process.workitem.core.util.Wid;
+import org.jbpm.process.workitem.core.util.WidMavenDepends;
+import org.jbpm.process.workitem.core.util.WidParameter;
+import org.jbpm.process.workitem.core.util.WidResult;
+import org.jbpm.process.workitem.core.util.service.WidAction;
+import org.jbpm.process.workitem.core.util.service.WidService;
+import org.jbpm.services.api.service.ServiceRegistry;
+import org.kie.api.runtime.manager.RuntimeManager;
+import org.kie.api.runtime.process.WorkItem;
+import org.kie.api.runtime.process.WorkItemManager;
+import org.kie.internal.runtime.Cacheable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Camel jBPM {@link WorkItemHandler} which allows to call Camel routes with a <code>direct</code> endpoint.
+ * <p/>
+ * The handler passes the {@WorkItem} to the route that has a consumer on the endpoint-id that can be passed with the
+ * <code>camel-endpoint-id</code>{@link WorkItem} parameter. E.g. when a the value "myCamelEndpoint" is passed to the 
+ * {link WorkItem} via the <code>camel-endpoint-id</code> parameter, this command will send the {@link WorkItem} to the Camel URI
+ * <code>direct://myCamelEndpoint</code>.
+ * <p/>
+ * The body of the result {@link Message} of the invocation is returned via the <code>response</code> parameter. Access to the raw response
+ * {@link Message} is provided via the <code>message</code> parameter. This gives the user access to more advanced fields like message
+ * headers and attachments.
+ * <p/>
+ * This handler can be constructed in 2 ways. When using the default constructor, the handler will try to find the global KIE
+ * {@link CamelContext} from the <code>jBPM</code> {@link ServiceRegistry}. When the {@link RuntimeManager} is passed to the constructor,
+ * the handler will retrieve and use the {@link CamelContext} bound to the {@link RuntimeManage} from the {@link ServiceRegistry}
+ * 
+ */
+@Wid(widfile = "CamelConnector.wid", name = "CamelConnector", displayName = "CamelConnector", defaultHandler = "mvel: new org.apache.camel.component.jbpm.workitem.CamelWorkitemHandler()", documentation = "${artifactId}/index.html", parameters = {
+		@WidParameter(name = "camel-endpoint-id") }, results = { @WidResult(name = "response"),
+				@WidResult(name = "message") }, mavenDepends = {
+						@WidMavenDepends(group = "${groupId}", artifact = "${artifactId}", version = "${version}") }, serviceInfo = @WidService(category = "${name}", description = "${description}", keywords = "apache,camel,payload,route,connector", action = @WidAction(title = "Send payload to a Camel endpoint")))
+public class CamelWorkItemHandler extends AbstractLogOrThrowWorkItemHandler implements Cacheable {
+
+	private static final String GLOBAL_CAMEL_CONTEXT_SERVICE_KEY = "GlobalCamelService";
+	private static final String RUNTIME_CAMEL_CONTEXT_SERVICE_POSTFIX = "_CamelService";
+
+	private static final String CAMEL_ENDPOINT_ID_PARAM = "camel-endpoint-id";
+	private static final String RESPONSE_PARAM = "response";
+	private static final String MESSAGE_PARAM = "message";
+
+	private static Logger logger = LoggerFactory.getLogger(CamelWorkItemHandler.class);
+
+	private final ProducerTemplate producerTemplate;
+
+	/**
+	 * Default Constructor. This creates a {@link ProducerTemplate} for the global {@link CamelContext}.
+	 */
+	public CamelWorkItemHandler() {
+		CamelContext globalCamelContext = (CamelContext) ServiceRegistry.get().service(GLOBAL_CAMEL_CONTEXT_SERVICE_KEY);
+		// TODO: Should we allow to set the maximumCacheSize on the producer?
+		this.producerTemplate = globalCamelContext.createProducerTemplate();
+	}
+
+	/**
+	 * Constructor which accepts {@link RuntimeManager}. This causes this WorkItemHanlder to create a {@link ProducerTemplate} for the
+	 * runtime specific {@link CamelContext}.
+	 */
+	public CamelWorkItemHandler(RuntimeManager runtimeManager) {
+		String runtimeCamelContextKey = runtimeManager.getIdentifier() + RUNTIME_CAMEL_CONTEXT_SERVICE_POSTFIX;
+		CamelContext runtimeCamelContext = (CamelContext) ServiceRegistry.get().service(runtimeCamelContextKey);
+		// TODO: Should we allow to set the maximumCacheSize on the producer?
+		this.producerTemplate = runtimeCamelContext.createProducerTemplate();
+	}
+
+	public void executeWorkItem(WorkItem workItem, final WorkItemManager manager) {
+
+		String camelEndpointId = (String) workItem.getParameter(CAMEL_ENDPOINT_ID_PARAM);
+
+		// We only support direct. We don't need to support more, as direct simply gives us the entrypoint into the actual Camel Routes.
+		String camelUri = "direct://" + camelEndpointId;
+		try {
+			Exchange inExchange = ExchangeBuilder.anExchange(producerTemplate.getCamelContext()).withBody(workItem).build();
+			Exchange outExchange = producerTemplate.send(camelUri, inExchange);
+			// producerTemplate.send does not throw exceptions, instead they are set on the returned Exchange.
+			if (outExchange.getException() != null) {
+				throw outExchange.getException();
+			}
+			Message outMessage = outExchange.getOut();
+
+			Map<String, Object> result = new HashMap<>();
+			Object response = outMessage.getBody();
+			result.put(RESPONSE_PARAM, response);
+			result.put(MESSAGE_PARAM, outMessage);
+
+			manager.completeWorkItem(workItem.getId(), result);
+		} catch (Exception e) {
+			handleException(e);
+		}
+	}
+
+	public void abortWorkItem(WorkItem workItem, WorkItemManager manager) {
+		// Do nothing, cannot be aborted
+	}
+
+	@Override
+	public void close() {
+		try {
+			this.producerTemplate.stop();
+		} catch (Exception e) {
+			logger.warn("Error encountered while closing the Camel Producer Template.", e);
+			// Not much we can do here, so swallowing exception.
+		}
+	}
+
+}
\ No newline at end of file
diff --git a/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/CamelCommandTest.java b/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/CamelCommandTest.java
new file mode 100644
index 0000000..543b833
--- /dev/null
+++ b/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/CamelCommandTest.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2018 Red Hat, Inc. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jbpm.workitem;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+import static org.mockito.ArgumentMatchers.any;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.ProducerTemplate;
+import org.drools.core.process.instance.impl.WorkItemImpl;
+import org.jbpm.services.api.service.ServiceRegistry;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.kie.api.executor.CommandContext;
+import org.kie.api.executor.ExecutionResults;
+import org.kie.api.runtime.manager.RuntimeManager;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class CamelCommandTest {
+
+	@Mock
+    ProducerTemplate producerTemplate;
+
+    @Mock
+    Exchange outExchange;
+    
+    @Mock
+    Message outMessage;
+    
+    @Mock
+    CamelContext camelContext;
+    
+    @Mock
+    RuntimeManager runtimeManager;
+    
+    @Mock
+    CommandContext commandContext;
+
+    @Test
+    public void testExecuteComamnd() throws Exception {
+    
+    	String camelEndpointId = "testCamelRoute";
+    	String camelRouteUri = "direct://" + camelEndpointId;
+    	
+    	String testReponse = "testResponse";
+    	
+    	String runtimeManagerId = "testRuntimeManager";
+    	
+    	when(producerTemplate.send(eq(camelRouteUri), any(Exchange.class))).thenReturn(outExchange);
+    	
+    	
+    	when(producerTemplate.getCamelContext()).thenReturn(camelContext);
+    	
+    	when(camelContext.createProducerTemplate()).thenReturn(producerTemplate);
+    	
+    	when(outExchange.getOut()).thenReturn(outMessage);
+    	when(outMessage.getBody()).thenReturn(testReponse);
+    	
+    	//Register the RuntimeManager bound camelcontext.
+    	ServiceRegistry.get().register("GlobalCamelService", camelContext);
+    	
+        WorkItemImpl workItem = new WorkItemImpl();
+        workItem.setParameter("camel-endpoint-id", camelEndpointId);
+        workItem.setParameter("request", "someRequest");
+        
+        when(commandContext.getData(anyString())).thenReturn(workItem);
+        
+        CamelCommand command = new CamelCommand();
+        ExecutionResults results = command.execute(commandContext);
+        
+        
+        assertNotNull(results);
+        assertEquals(2, results.getData().size());
+        assertEquals(testReponse, results.getData().get("response"));
+    }
+    
+    
+    @Test
+    public void testExecuteCommandLocalCamelContext() throws Exception {
+    
+    	String camelEndpointId = "testCamelRoute";
+    	String camelRouteUri = "direct://" + camelEndpointId;
+    	
+    	String testReponse = "testResponse";
+    	
+    	String runtimeManagerId = "testRuntimeManager";
+    	
+    	when(runtimeManager.getIdentifier()).thenReturn(runtimeManagerId);
+    	
+    	when(producerTemplate.send(eq(camelRouteUri), any(Exchange.class))).thenReturn(outExchange);
+    	//when(producerTemplate.send(argThat(not(camelRouteUri)), any(Exchange.class))).thenThrow(new IllegalArgumentException("Unexpected route id"));
+    	when(producerTemplate.getCamelContext()).thenReturn(camelContext);
+    	
+    	when(camelContext.createProducerTemplate()).thenReturn(producerTemplate);
+    	
+    	when(outExchange.getOut()).thenReturn(outMessage);
+    	when(outMessage.getBody()).thenReturn(testReponse);
+    	
+    	//Register the RuntimeManager bound camelcontext.
+    	ServiceRegistry.get().register(runtimeManagerId + "_CamelService", camelContext);
+    	
+        WorkItemImpl workItem = new WorkItemImpl();
+        workItem.setParameter("camel-endpoint-id", camelEndpointId);
+        workItem.setParameter("request", "someRequest");
+        
+        when(commandContext.getData(anyString())).thenReturn(workItem);
+        
+        CamelCommand command = new CamelCommand(runtimeManager);
+        ExecutionResults results = command.execute(commandContext);
+        
+        assertNotNull(results);
+        assertEquals(2, results.getData().size());
+        assertEquals(testReponse, results.getData().get("response"));
+    }
+
+}
diff --git a/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/CamelWorkItemHandlerTest.java b/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/CamelWorkItemHandlerTest.java
new file mode 100644
index 0000000..7872132
--- /dev/null
+++ b/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/CamelWorkItemHandlerTest.java
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2018 Red Hat, Inc. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.jbpm.workitem;
+
+import static org.junit.Assert.*;
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.ProducerTemplate;
+import org.drools.core.process.instance.impl.WorkItemImpl;
+import org.jbpm.process.workitem.core.TestWorkItemManager;
+import org.jbpm.services.api.service.ServiceRegistry;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.kie.api.runtime.manager.RuntimeManager;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class CamelWorkItemHandlerTest {
+
+    @Mock
+    ProducerTemplate producerTemplate;
+
+    @Mock
+    Exchange outExchange;
+    
+    @Mock
+    Message outMessage;
+    
+    @Mock
+    CamelContext camelContext;
+    
+    @Mock
+    RuntimeManager runtimeManager;
+
+    @Test
+    public void testExecuteGlobalCamelContext() throws Exception {
+    
+    	String camelEndpointId = "testCamelRoute";
+    	String camelRouteUri = "direct://" + camelEndpointId;
+    	
+    	String testReponse = "testResponse";
+    	
+    	when(producerTemplate.send(eq(camelRouteUri), any(Exchange.class))).thenReturn(outExchange);
+    	when(producerTemplate.getCamelContext()).thenReturn(camelContext);
+    	
+    	when(camelContext.createProducerTemplate()).thenReturn(producerTemplate);
+    	
+    	when(outExchange.getOut()).thenReturn(outMessage);
+    	when(outMessage.getBody()).thenReturn(testReponse);
+    	
+    	ServiceRegistry.get().register("GlobalCamelService", camelContext);
+    	
+    	
+        TestWorkItemManager manager = new TestWorkItemManager();
+        WorkItemImpl workItem = new WorkItemImpl();
+        workItem.setParameter("camel-endpoint-id", camelEndpointId);
+        workItem.setParameter("request", "someRequest");
+        
+        CamelWorkItemHandler handler = new CamelWorkItemHandler();
+        
+        handler.executeWorkItem(workItem,
+                                manager);
+        assertNotNull(manager.getResults());
+        assertEquals(1,
+                     manager.getResults().size());
+        assertTrue(manager.getResults().containsKey(workItem.getId()));
+        Map<String, Object> results = manager.getResults(workItem.getId());
+        assertEquals(2, results.size());
+        assertEquals(testReponse, results.get("response"));
+    }
+    
+    @Test
+    public void testExecuteLocalCamelContext() throws Exception {
+    
+    	String camelEndpointId = "testCamelRoute";
+    	String camelRouteUri = "direct://" + camelEndpointId;
+    	
+    	String testReponse = "testResponse";
+    	
+    	String runtimeManagerId = "testRuntimeManager";
+    	
+    	when(runtimeManager.getIdentifier()).thenReturn(runtimeManagerId);
+    	
+    	when(producerTemplate.send(eq(camelRouteUri), any(Exchange.class))).thenReturn(outExchange);
+       	when(producerTemplate.getCamelContext()).thenReturn(camelContext);
+    	
+    	when(camelContext.createProducerTemplate()).thenReturn(producerTemplate);
+    	
+    	when(outExchange.getOut()).thenReturn(outMessage);
+    	when(outMessage.getBody()).thenReturn(testReponse);
+    	
+    	//Register the RuntimeManager bound camelcontext.
+    	ServiceRegistry.get().register(runtimeManagerId + "_CamelService", camelContext);
+    	
+    	
+        WorkItemImpl workItem = new WorkItemImpl();
+        workItem.setParameter("camel-endpoint-id", camelEndpointId);
+        workItem.setParameter("request", "someRequest");
+        
+        CamelWorkItemHandler handler = new CamelWorkItemHandler(runtimeManager);
+        
+        TestWorkItemManager manager = new TestWorkItemManager();
+        handler.executeWorkItem(workItem,
+                                manager);
+        assertNotNull(manager.getResults());
+        assertEquals(1,
+                     manager.getResults().size());
+        assertTrue(manager.getResults().containsKey(workItem.getId()));
+        Map<String, Object> results = manager.getResults(workItem.getId());
+        assertEquals(2, results.size());
+        assertEquals(testReponse, results.get("response"));
+    }
+
+
+}