You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by dm...@apache.org on 2018/11/30 10:19:14 UTC
[camel] 01/05: CAMEL-12964: Initial import of jBPM CamelWIH and
Command.
This is an automated email from the ASF dual-hosted git repository.
dmvolod pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 128f84189a366db43a47461353c2ccaa8d8f658c
Author: Duncan Doyle <Du...@gmail.com>
AuthorDate: Wed Nov 28 13:50:22 2018 +0100
CAMEL-12964: Initial import of jBPM CamelWIH and Command.
---
components/camel-jbpm/pom.xml | 27 ++++
.../component/jbpm/workitem/CamelCommand.java | 121 ++++++++++++++++++
.../jbpm/workitem/CamelWorkItemHandler.java | 137 +++++++++++++++++++++
.../component/jbpm/workitem/CamelCommandTest.java | 134 ++++++++++++++++++++
.../jbpm/workitem/CamelWorkItemHandlerTest.java | 137 +++++++++++++++++++++
5 files changed, 556 insertions(+)
diff --git a/components/camel-jbpm/pom.xml b/components/camel-jbpm/pom.xml
index a0536f0a..7927c7c 100644
--- a/components/camel-jbpm/pom.xml
+++ b/components/camel-jbpm/pom.xml
@@ -92,6 +92,15 @@
<optional>true</optional>
</dependency>
+ <!-- jBPM WorkItems -->
+ <dependency>
+ <groupId>org.jbpm</groupId>
+ <artifactId>jbpm-workitems-core</artifactId>
+ <version>${jbpm-version}</version>
+ <scope>provided</scope>
+ <optional>true</optional>
+ </dependency>
+
<dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging</artifactId>
@@ -128,6 +137,24 @@
<artifactId>camel-test</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.jbpm</groupId>
+ <artifactId>jbpm-workitems-core</artifactId>
+ <version>${jbpm-version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>xml-apis</groupId>
+ <artifactId>xml-apis</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
</project>
diff --git a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/CamelCommand.java b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/CamelCommand.java
new file mode 100644
index 0000000..80c767e
--- /dev/null
+++ b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/CamelCommand.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.jbpm.workitem;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.ExchangeBuilder;
+import org.jbpm.services.api.service.ServiceRegistry;
+import org.kie.api.executor.Command;
+import org.kie.api.executor.CommandContext;
+import org.kie.api.executor.ExecutionResults;
+import org.kie.api.runtime.manager.RuntimeManager;
+import org.kie.api.runtime.process.WorkItem;
+import org.kie.internal.runtime.Cacheable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Camel jBPM {@link Command} which allows to call Camel routes with a <code>direct</code> endpoint.
+ * <p/>
+ * The command passes the {@WorkItem} retrieved from the {@link CommandContext} to the route that has a consumer on the endpoint-id
+ * that can be passed with the <code>camel-endpoint-id</code> {@link WorkItem} parameter. E.g. when a the value "myCamelEndpoint" is passed to the
+ * {link WorkItem} via the <code>camel-endpoint-id</code> parameter, this {@link Command} will send the {@link WorkItem} to
+ * the Camel URI <code>direct://myCamelEndpoint</code>.
+ * <p/>
+ * The body of the result {@link Message} of the invocation is returned via the <code>response</code> parameter. Access to the raw response
+ * {@link Message} is provided via the <code>message</code> parameter. This gives the user access to more advanced fields like message headers
+ * and attachments.
+ * <p/>
+ * This {@link Command} can be constructed in 2 ways. When using the default constructor, the {link Command} will try to find
+ * the global KIE {@link CamelContext} from the <code>jBPM</code> {@link ServiceRegistry}.
+ * When the {@link RuntimeManager} is passed to the constructor, the {@link Command} will retrieve and use the {@link CamelContext} bound
+ * to the {@link RuntimeManage} from the {@link ServiceRegistry}
+ *
+ */
+public class CamelCommand implements Command,
+ Cacheable {
+
+ private static final String GLOBAL_CAMEL_CONTEXT_SERVICE_KEY = "GlobalCamelService";
+ private static final String RUNTIME_CAMEL_CONTEXT_SERVICE_POSTFIX = "_CamelService";
+
+ private static final String CAMEL_ENDPOINT_ID_PARAM = "camel-endpoint-id";
+ private static final String RESPONSE_PARAM = "response";
+ private static final String MESSAGE_PARAM = "out-headers";
+
+
+ private static final Logger logger = LoggerFactory.getLogger(CamelCommand.class);
+
+ private final ProducerTemplate producerTemplate;
+
+
+ public CamelCommand() {
+ CamelContext globalCamelContext = (CamelContext) ServiceRegistry.get().service(GLOBAL_CAMEL_CONTEXT_SERVICE_KEY);
+ // TODO: Should we allow to set the maximumCacheSize on the producer?
+ this.producerTemplate = globalCamelContext.createProducerTemplate();
+ }
+
+ public CamelCommand(RuntimeManager runtimeManager) {
+ String runtimeCamelContextKey = runtimeManager.getIdentifier() + RUNTIME_CAMEL_CONTEXT_SERVICE_POSTFIX;
+ CamelContext runtimeCamelContext = (CamelContext) ServiceRegistry.get().service(runtimeCamelContextKey);
+ // TODO: Should we allow to set the maximumCacheSize on the producer?
+ this.producerTemplate = runtimeCamelContext.createProducerTemplate();
+ }
+
+
+
+ @Override
+ public ExecutionResults execute(CommandContext ctx) throws Exception {
+
+ WorkItem workItem = (WorkItem) ctx.getData("workItem");
+
+ String camelEndpointId = (String) workItem.getParameter(CAMEL_ENDPOINT_ID_PARAM);
+
+ // We only support direct. We don't need to support more, as direct simply gives us the entrypoint into the actual Camel Routes.
+ String camelUri = "direct://" + camelEndpointId;
+
+ Exchange inExchange = ExchangeBuilder.anExchange(producerTemplate.getCamelContext()).withBody(workItem).build();
+ Exchange outExchange = producerTemplate.send(camelUri, inExchange);
+ // producerTemplate.send does not throw exceptions, instead they are set on the returned Exchange.
+ if (outExchange.getException() != null) {
+ throw outExchange.getException();
+ }
+ Message outMessage = outExchange.getOut();
+
+ ExecutionResults results = new ExecutionResults();
+ Object response = outMessage.getBody();
+ results.setData(RESPONSE_PARAM, response);
+ results.setData(MESSAGE_PARAM, outMessage);
+
+ return results;
+ }
+
+
+ @Override
+ public void close() {
+ try {
+ this.producerTemplate.stop();
+ } catch (Exception e) {
+ logger.warn("Error encountered while closing the Camel Producer Template.", e);
+ // Not much we can do here, so swallowing exception.
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/CamelWorkItemHandler.java b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/CamelWorkItemHandler.java
new file mode 100644
index 0000000..7734dd9
--- /dev/null
+++ b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/CamelWorkItemHandler.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.jbpm.workitem;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.ExchangeBuilder;
+import org.jbpm.process.workitem.core.AbstractLogOrThrowWorkItemHandler;
+import org.jbpm.process.workitem.core.util.Wid;
+import org.jbpm.process.workitem.core.util.WidMavenDepends;
+import org.jbpm.process.workitem.core.util.WidParameter;
+import org.jbpm.process.workitem.core.util.WidResult;
+import org.jbpm.process.workitem.core.util.service.WidAction;
+import org.jbpm.process.workitem.core.util.service.WidService;
+import org.jbpm.services.api.service.ServiceRegistry;
+import org.kie.api.runtime.manager.RuntimeManager;
+import org.kie.api.runtime.process.WorkItem;
+import org.kie.api.runtime.process.WorkItemManager;
+import org.kie.internal.runtime.Cacheable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Camel jBPM {@link WorkItemHandler} which allows to call Camel routes with a <code>direct</code> endpoint.
+ * <p/>
+ * The handler passes the {@WorkItem} to the route that has a consumer on the endpoint-id that can be passed with the
+ * <code>camel-endpoint-id</code>{@link WorkItem} parameter. E.g. when a the value "myCamelEndpoint" is passed to the
+ * {link WorkItem} via the <code>camel-endpoint-id</code> parameter, this command will send the {@link WorkItem} to the Camel URI
+ * <code>direct://myCamelEndpoint</code>.
+ * <p/>
+ * The body of the result {@link Message} of the invocation is returned via the <code>response</code> parameter. Access to the raw response
+ * {@link Message} is provided via the <code>message</code> parameter. This gives the user access to more advanced fields like message
+ * headers and attachments.
+ * <p/>
+ * This handler can be constructed in 2 ways. When using the default constructor, the handler will try to find the global KIE
+ * {@link CamelContext} from the <code>jBPM</code> {@link ServiceRegistry}. When the {@link RuntimeManager} is passed to the constructor,
+ * the handler will retrieve and use the {@link CamelContext} bound to the {@link RuntimeManage} from the {@link ServiceRegistry}
+ *
+ */
+@Wid(widfile = "CamelConnector.wid", name = "CamelConnector", displayName = "CamelConnector", defaultHandler = "mvel: new org.apache.camel.component.jbpm.workitem.CamelWorkitemHandler()", documentation = "${artifactId}/index.html", parameters = {
+ @WidParameter(name = "camel-endpoint-id") }, results = { @WidResult(name = "response"),
+ @WidResult(name = "message") }, mavenDepends = {
+ @WidMavenDepends(group = "${groupId}", artifact = "${artifactId}", version = "${version}") }, serviceInfo = @WidService(category = "${name}", description = "${description}", keywords = "apache,camel,payload,route,connector", action = @WidAction(title = "Send payload to a Camel endpoint")))
+public class CamelWorkItemHandler extends AbstractLogOrThrowWorkItemHandler implements Cacheable {
+
+ private static final String GLOBAL_CAMEL_CONTEXT_SERVICE_KEY = "GlobalCamelService";
+ private static final String RUNTIME_CAMEL_CONTEXT_SERVICE_POSTFIX = "_CamelService";
+
+ private static final String CAMEL_ENDPOINT_ID_PARAM = "camel-endpoint-id";
+ private static final String RESPONSE_PARAM = "response";
+ private static final String MESSAGE_PARAM = "message";
+
+ private static Logger logger = LoggerFactory.getLogger(CamelWorkItemHandler.class);
+
+ private final ProducerTemplate producerTemplate;
+
+ /**
+ * Default Constructor. This creates a {@link ProducerTemplate} for the global {@link CamelContext}.
+ */
+ public CamelWorkItemHandler() {
+ CamelContext globalCamelContext = (CamelContext) ServiceRegistry.get().service(GLOBAL_CAMEL_CONTEXT_SERVICE_KEY);
+ // TODO: Should we allow to set the maximumCacheSize on the producer?
+ this.producerTemplate = globalCamelContext.createProducerTemplate();
+ }
+
+ /**
+ * Constructor which accepts {@link RuntimeManager}. This causes this WorkItemHanlder to create a {@link ProducerTemplate} for the
+ * runtime specific {@link CamelContext}.
+ */
+ public CamelWorkItemHandler(RuntimeManager runtimeManager) {
+ String runtimeCamelContextKey = runtimeManager.getIdentifier() + RUNTIME_CAMEL_CONTEXT_SERVICE_POSTFIX;
+ CamelContext runtimeCamelContext = (CamelContext) ServiceRegistry.get().service(runtimeCamelContextKey);
+ // TODO: Should we allow to set the maximumCacheSize on the producer?
+ this.producerTemplate = runtimeCamelContext.createProducerTemplate();
+ }
+
+ public void executeWorkItem(WorkItem workItem, final WorkItemManager manager) {
+
+ String camelEndpointId = (String) workItem.getParameter(CAMEL_ENDPOINT_ID_PARAM);
+
+ // We only support direct. We don't need to support more, as direct simply gives us the entrypoint into the actual Camel Routes.
+ String camelUri = "direct://" + camelEndpointId;
+ try {
+ Exchange inExchange = ExchangeBuilder.anExchange(producerTemplate.getCamelContext()).withBody(workItem).build();
+ Exchange outExchange = producerTemplate.send(camelUri, inExchange);
+ // producerTemplate.send does not throw exceptions, instead they are set on the returned Exchange.
+ if (outExchange.getException() != null) {
+ throw outExchange.getException();
+ }
+ Message outMessage = outExchange.getOut();
+
+ Map<String, Object> result = new HashMap<>();
+ Object response = outMessage.getBody();
+ result.put(RESPONSE_PARAM, response);
+ result.put(MESSAGE_PARAM, outMessage);
+
+ manager.completeWorkItem(workItem.getId(), result);
+ } catch (Exception e) {
+ handleException(e);
+ }
+ }
+
+ public void abortWorkItem(WorkItem workItem, WorkItemManager manager) {
+ // Do nothing, cannot be aborted
+ }
+
+ @Override
+ public void close() {
+ try {
+ this.producerTemplate.stop();
+ } catch (Exception e) {
+ logger.warn("Error encountered while closing the Camel Producer Template.", e);
+ // Not much we can do here, so swallowing exception.
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/CamelCommandTest.java b/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/CamelCommandTest.java
new file mode 100644
index 0000000..543b833
--- /dev/null
+++ b/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/CamelCommandTest.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2018 Red Hat, Inc. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jbpm.workitem;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+import static org.mockito.ArgumentMatchers.any;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.ProducerTemplate;
+import org.drools.core.process.instance.impl.WorkItemImpl;
+import org.jbpm.services.api.service.ServiceRegistry;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.kie.api.executor.CommandContext;
+import org.kie.api.executor.ExecutionResults;
+import org.kie.api.runtime.manager.RuntimeManager;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class CamelCommandTest {
+
+ @Mock
+ ProducerTemplate producerTemplate;
+
+ @Mock
+ Exchange outExchange;
+
+ @Mock
+ Message outMessage;
+
+ @Mock
+ CamelContext camelContext;
+
+ @Mock
+ RuntimeManager runtimeManager;
+
+ @Mock
+ CommandContext commandContext;
+
+ @Test
+ public void testExecuteComamnd() throws Exception {
+
+ String camelEndpointId = "testCamelRoute";
+ String camelRouteUri = "direct://" + camelEndpointId;
+
+ String testReponse = "testResponse";
+
+ String runtimeManagerId = "testRuntimeManager";
+
+ when(producerTemplate.send(eq(camelRouteUri), any(Exchange.class))).thenReturn(outExchange);
+
+
+ when(producerTemplate.getCamelContext()).thenReturn(camelContext);
+
+ when(camelContext.createProducerTemplate()).thenReturn(producerTemplate);
+
+ when(outExchange.getOut()).thenReturn(outMessage);
+ when(outMessage.getBody()).thenReturn(testReponse);
+
+ //Register the RuntimeManager bound camelcontext.
+ ServiceRegistry.get().register("GlobalCamelService", camelContext);
+
+ WorkItemImpl workItem = new WorkItemImpl();
+ workItem.setParameter("camel-endpoint-id", camelEndpointId);
+ workItem.setParameter("request", "someRequest");
+
+ when(commandContext.getData(anyString())).thenReturn(workItem);
+
+ CamelCommand command = new CamelCommand();
+ ExecutionResults results = command.execute(commandContext);
+
+
+ assertNotNull(results);
+ assertEquals(2, results.getData().size());
+ assertEquals(testReponse, results.getData().get("response"));
+ }
+
+
+ @Test
+ public void testExecuteCommandLocalCamelContext() throws Exception {
+
+ String camelEndpointId = "testCamelRoute";
+ String camelRouteUri = "direct://" + camelEndpointId;
+
+ String testReponse = "testResponse";
+
+ String runtimeManagerId = "testRuntimeManager";
+
+ when(runtimeManager.getIdentifier()).thenReturn(runtimeManagerId);
+
+ when(producerTemplate.send(eq(camelRouteUri), any(Exchange.class))).thenReturn(outExchange);
+ //when(producerTemplate.send(argThat(not(camelRouteUri)), any(Exchange.class))).thenThrow(new IllegalArgumentException("Unexpected route id"));
+ when(producerTemplate.getCamelContext()).thenReturn(camelContext);
+
+ when(camelContext.createProducerTemplate()).thenReturn(producerTemplate);
+
+ when(outExchange.getOut()).thenReturn(outMessage);
+ when(outMessage.getBody()).thenReturn(testReponse);
+
+ //Register the RuntimeManager bound camelcontext.
+ ServiceRegistry.get().register(runtimeManagerId + "_CamelService", camelContext);
+
+ WorkItemImpl workItem = new WorkItemImpl();
+ workItem.setParameter("camel-endpoint-id", camelEndpointId);
+ workItem.setParameter("request", "someRequest");
+
+ when(commandContext.getData(anyString())).thenReturn(workItem);
+
+ CamelCommand command = new CamelCommand(runtimeManager);
+ ExecutionResults results = command.execute(commandContext);
+
+ assertNotNull(results);
+ assertEquals(2, results.getData().size());
+ assertEquals(testReponse, results.getData().get("response"));
+ }
+
+}
diff --git a/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/CamelWorkItemHandlerTest.java b/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/CamelWorkItemHandlerTest.java
new file mode 100644
index 0000000..7872132
--- /dev/null
+++ b/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/CamelWorkItemHandlerTest.java
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2018 Red Hat, Inc. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.jbpm.workitem;
+
+import static org.junit.Assert.*;
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.ProducerTemplate;
+import org.drools.core.process.instance.impl.WorkItemImpl;
+import org.jbpm.process.workitem.core.TestWorkItemManager;
+import org.jbpm.services.api.service.ServiceRegistry;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.kie.api.runtime.manager.RuntimeManager;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class CamelWorkItemHandlerTest {
+
+ @Mock
+ ProducerTemplate producerTemplate;
+
+ @Mock
+ Exchange outExchange;
+
+ @Mock
+ Message outMessage;
+
+ @Mock
+ CamelContext camelContext;
+
+ @Mock
+ RuntimeManager runtimeManager;
+
+ @Test
+ public void testExecuteGlobalCamelContext() throws Exception {
+
+ String camelEndpointId = "testCamelRoute";
+ String camelRouteUri = "direct://" + camelEndpointId;
+
+ String testReponse = "testResponse";
+
+ when(producerTemplate.send(eq(camelRouteUri), any(Exchange.class))).thenReturn(outExchange);
+ when(producerTemplate.getCamelContext()).thenReturn(camelContext);
+
+ when(camelContext.createProducerTemplate()).thenReturn(producerTemplate);
+
+ when(outExchange.getOut()).thenReturn(outMessage);
+ when(outMessage.getBody()).thenReturn(testReponse);
+
+ ServiceRegistry.get().register("GlobalCamelService", camelContext);
+
+
+ TestWorkItemManager manager = new TestWorkItemManager();
+ WorkItemImpl workItem = new WorkItemImpl();
+ workItem.setParameter("camel-endpoint-id", camelEndpointId);
+ workItem.setParameter("request", "someRequest");
+
+ CamelWorkItemHandler handler = new CamelWorkItemHandler();
+
+ handler.executeWorkItem(workItem,
+ manager);
+ assertNotNull(manager.getResults());
+ assertEquals(1,
+ manager.getResults().size());
+ assertTrue(manager.getResults().containsKey(workItem.getId()));
+ Map<String, Object> results = manager.getResults(workItem.getId());
+ assertEquals(2, results.size());
+ assertEquals(testReponse, results.get("response"));
+ }
+
+ @Test
+ public void testExecuteLocalCamelContext() throws Exception {
+
+ String camelEndpointId = "testCamelRoute";
+ String camelRouteUri = "direct://" + camelEndpointId;
+
+ String testReponse = "testResponse";
+
+ String runtimeManagerId = "testRuntimeManager";
+
+ when(runtimeManager.getIdentifier()).thenReturn(runtimeManagerId);
+
+ when(producerTemplate.send(eq(camelRouteUri), any(Exchange.class))).thenReturn(outExchange);
+ when(producerTemplate.getCamelContext()).thenReturn(camelContext);
+
+ when(camelContext.createProducerTemplate()).thenReturn(producerTemplate);
+
+ when(outExchange.getOut()).thenReturn(outMessage);
+ when(outMessage.getBody()).thenReturn(testReponse);
+
+ //Register the RuntimeManager bound camelcontext.
+ ServiceRegistry.get().register(runtimeManagerId + "_CamelService", camelContext);
+
+
+ WorkItemImpl workItem = new WorkItemImpl();
+ workItem.setParameter("camel-endpoint-id", camelEndpointId);
+ workItem.setParameter("request", "someRequest");
+
+ CamelWorkItemHandler handler = new CamelWorkItemHandler(runtimeManager);
+
+ TestWorkItemManager manager = new TestWorkItemManager();
+ handler.executeWorkItem(workItem,
+ manager);
+ assertNotNull(manager.getResults());
+ assertEquals(1,
+ manager.getResults().size());
+ assertTrue(manager.getResults().containsKey(workItem.getId()));
+ Map<String, Object> results = manager.getResults(workItem.getId());
+ assertEquals(2, results.size());
+ assertEquals(testReponse, results.get("response"));
+ }
+
+
+}