You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by dm...@apache.org on 2018/11/30 10:19:15 UTC
[camel] 02/05: CamelCommand splitted in a separate command for
Camel DeploymentContext and GlobalContext. WorkItemHandler can now be
configured with default route. WIH supports both InOut and InOnly MEPs.
Added tests that use a test CamelContext and test-routes to verify
behaviour on Exceptions.
This is an automated email from the ASF dual-hosted git repository.
dmvolod pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 89f1070c7bfd6f8d4a71f405bfcc41249723fc46
Author: Duncan Doyle <Du...@gmail.com>
AuthorDate: Thu Nov 29 15:02:15 2018 +0100
CamelCommand splitted in a separate command for Camel DeploymentContext and GlobalContext. WorkItemHandler can now be configured with default route. WIH supports both InOut and InOnly MEPs. Added tests that use a test CamelContext and test-routes to verify behaviour on Exceptions.
---
...CamelCommand.java => AbstractCamelCommand.java} | 46 +----
.../workitem/AbstractCamelWorkItemHandler.java | 156 ++++++++++++++++
.../jbpm/workitem/CamelWorkItemHandler.java | 137 --------------
.../workitem/DeploymentContextCamelCommand.java | 73 ++++++++
.../jbpm/workitem/GlobalContextCamelCommand.java | 60 ++++++
.../jbpm/workitem/InOnlyCamelWorkItemHandler.java | 106 +++++++++++
.../jbpm/workitem/InOutCamelWorkItemHandler.java | 119 ++++++++++++
.../CamelWorkItemHandlerIntegrationTests.java | 204 +++++++++++++++++++++
.../jbpm/workitem/CamelWorkItemHandlerTest.java | 137 --------------
.../DeploymentContextCamelCommandTest.java | 95 ++++++++++
...est.java => GlobalContextCamelCommandTest.java} | 65 ++-----
.../workitem/InOnlyCamelWorkItemHandlerTest.java | 97 ++++++++++
.../workitem/InOutCamelWorkItemHandlerTest.java | 147 +++++++++++++++
13 files changed, 1078 insertions(+), 364 deletions(-)
diff --git a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/CamelCommand.java b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/AbstractCamelCommand.java
similarity index 68%
rename from components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/CamelCommand.java
rename to components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/AbstractCamelCommand.java
index 80c767e..3cb1ee4 100644
--- a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/CamelCommand.java
+++ b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/AbstractCamelCommand.java
@@ -42,48 +42,24 @@ import org.slf4j.LoggerFactory;
* <p/>
* The body of the result {@link Message} of the invocation is returned via the <code>response</code> parameter. Access to the raw response
* {@link Message} is provided via the <code>message</code> parameter. This gives the user access to more advanced fields like message headers
- * and attachments.
- * <p/>
- * This {@link Command} can be constructed in 2 ways. When using the default constructor, the {link Command} will try to find
- * the global KIE {@link CamelContext} from the <code>jBPM</code> {@link ServiceRegistry}.
- * When the {@link RuntimeManager} is passed to the constructor, the {@link Command} will retrieve and use the {@link CamelContext} bound
- * to the {@link RuntimeManage} from the {@link ServiceRegistry}
- *
+ * and attachments.
*/
-public class CamelCommand implements Command,
+public abstract class AbstractCamelCommand implements Command,
Cacheable {
- private static final String GLOBAL_CAMEL_CONTEXT_SERVICE_KEY = "GlobalCamelService";
- private static final String RUNTIME_CAMEL_CONTEXT_SERVICE_POSTFIX = "_CamelService";
-
private static final String CAMEL_ENDPOINT_ID_PARAM = "camel-endpoint-id";
private static final String RESPONSE_PARAM = "response";
private static final String MESSAGE_PARAM = "out-headers";
-
- private static final Logger logger = LoggerFactory.getLogger(CamelCommand.class);
-
- private final ProducerTemplate producerTemplate;
+ private static final Logger logger = LoggerFactory.getLogger(AbstractCamelCommand.class);
-
- public CamelCommand() {
- CamelContext globalCamelContext = (CamelContext) ServiceRegistry.get().service(GLOBAL_CAMEL_CONTEXT_SERVICE_KEY);
- // TODO: Should we allow to set the maximumCacheSize on the producer?
- this.producerTemplate = globalCamelContext.createProducerTemplate();
- }
-
- public CamelCommand(RuntimeManager runtimeManager) {
- String runtimeCamelContextKey = runtimeManager.getIdentifier() + RUNTIME_CAMEL_CONTEXT_SERVICE_POSTFIX;
- CamelContext runtimeCamelContext = (CamelContext) ServiceRegistry.get().service(runtimeCamelContextKey);
- // TODO: Should we allow to set the maximumCacheSize on the producer?
- this.producerTemplate = runtimeCamelContext.createProducerTemplate();
+ public AbstractCamelCommand() {
}
-
-
@Override
public ExecutionResults execute(CommandContext ctx) throws Exception {
+
WorkItem workItem = (WorkItem) ctx.getData("workItem");
String camelEndpointId = (String) workItem.getParameter(CAMEL_ENDPOINT_ID_PARAM);
@@ -91,6 +67,7 @@ public class CamelCommand implements Command,
// We only support direct. We don't need to support more, as direct simply gives us the entrypoint into the actual Camel Routes.
String camelUri = "direct://" + camelEndpointId;
+ ProducerTemplate producerTemplate = getProducerTemplate(ctx);
Exchange inExchange = ExchangeBuilder.anExchange(producerTemplate.getCamelContext()).withBody(workItem).build();
Exchange outExchange = producerTemplate.send(camelUri, inExchange);
// producerTemplate.send does not throw exceptions, instead they are set on the returned Exchange.
@@ -106,16 +83,7 @@ public class CamelCommand implements Command,
return results;
}
-
- @Override
- public void close() {
- try {
- this.producerTemplate.stop();
- } catch (Exception e) {
- logger.warn("Error encountered while closing the Camel Producer Template.", e);
- // Not much we can do here, so swallowing exception.
- }
- }
+ protected abstract ProducerTemplate getProducerTemplate(CommandContext ctx);
}
\ No newline at end of file
diff --git a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/AbstractCamelWorkItemHandler.java b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/AbstractCamelWorkItemHandler.java
new file mode 100644
index 0000000..db88e70
--- /dev/null
+++ b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/AbstractCamelWorkItemHandler.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.jbpm.workitem;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.ProducerTemplate;
+import org.jbpm.process.workitem.core.AbstractLogOrThrowWorkItemHandler;
+import org.jbpm.services.api.service.ServiceRegistry;
+import org.kie.api.runtime.manager.RuntimeManager;
+import org.kie.api.runtime.process.WorkItem;
+import org.kie.api.runtime.process.WorkItemHandler;
+import org.kie.api.runtime.process.WorkItemManager;
+import org.kie.internal.runtime.Cacheable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Camel jBPM {@link WorkItemHandler} which allows to call Camel routes with a <code>direct</code> endpoint.
+ * <p/>
+ * The handler passes the {@WorkItem} to the route that has a consumer on the endpoint-id that can be passed with the
+ * <code>camel-endpoint-id</code>{@link WorkItem} parameter. E.g. when a the value "myCamelEndpoint" is passed to the {link WorkItem} via
+ * the <code>camel-endpoint-id</code> parameter, this command will send the {@link WorkItem} to the Camel URI
+ * <code>direct://myCamelEndpoint</code>.
+ * <p/>
+ * The body of the result {@link Message} of the invocation is returned via the <code>response</code> parameter. Access to the raw response
+ * {@link Message} is provided via the <code>message</code> parameter. This gives the user access to more advanced fields like message
+ * headers and attachments.
+ * <p/>
+ * This handler can be constructed in multiple ways. When you don't pass a {@link RuntimeManager} to the constructor, the handler will try
+ * to find the global KIE {@link CamelContext} from the <code>jBPM</code> {@link ServiceRegistry}. When the {@link RuntimeManager} is passed
+ * to the constructor, the handler will retrieve and use the {@link CamelContext} bound to the {@link RuntimeManage} from the
+ * {@link ServiceRegistry}. When a <code>camel-endpoint-id</code> is passed to the constructor, the handler will send all requests to the
+ * Camel route that is consuming from that endpoint, unless the endpoint is overridden by passing a the <code>camel-endpoint-id</code> in
+ * the {@link WorkItem} parameters.
+ *
+ */
+public abstract class AbstractCamelWorkItemHandler extends AbstractLogOrThrowWorkItemHandler implements Cacheable {
+
+ private static final String GLOBAL_CAMEL_CONTEXT_SERVICE_KEY = "GlobalCamelService";
+ private static final String RUNTIME_CAMEL_CONTEXT_SERVICE_POSTFIX = "_CamelService";
+
+ private static final String CAMEL_ENDPOINT_ID_PARAM = "camel-endpoint-id";
+
+ private static Logger logger = LoggerFactory.getLogger(AbstractCamelWorkItemHandler.class);
+
+ private final ProducerTemplate producerTemplate;
+
+ private final String camelEndpointId;
+
+ /**
+ * Default Constructor. This creates a {@link ProducerTemplate} for the global {@link CamelContext}.
+ */
+ public AbstractCamelWorkItemHandler() {
+ this("");
+ }
+
+ public AbstractCamelWorkItemHandler(String camelEndointId) {
+ CamelContext globalCamelContext = (CamelContext) ServiceRegistry.get().service(GLOBAL_CAMEL_CONTEXT_SERVICE_KEY);
+ // TODO: Should we allow to set the maximumCacheSize on the producer?
+ this.producerTemplate = globalCamelContext.createProducerTemplate();
+ this.camelEndpointId = camelEndointId;
+ }
+
+ /**
+ * Constructor which accepts {@link RuntimeManager}. This causes this WorkItemHanlder to create a {@link ProducerTemplate} for the
+ * runtime specific {@link CamelContext}.
+ */
+ public AbstractCamelWorkItemHandler(RuntimeManager runtimeManager) {
+ this(runtimeManager, "");
+ }
+
+ public AbstractCamelWorkItemHandler(RuntimeManager runtimeManager, String camelEndpointId) {
+ String runtimeCamelContextKey = runtimeManager.getIdentifier() + RUNTIME_CAMEL_CONTEXT_SERVICE_POSTFIX;
+ CamelContext runtimeCamelContext = (CamelContext) ServiceRegistry.get().service(runtimeCamelContextKey);
+ // TODO: Should we allow to set the maximumCacheSize on the producer?
+ this.producerTemplate = runtimeCamelContext.createProducerTemplate();
+ this.camelEndpointId = camelEndpointId;
+ }
+
+ public void executeWorkItem(WorkItem workItem, final WorkItemManager manager) {
+
+ String workItemCamelEndpointId = getCamelEndpointId(workItem);
+
+ // We only support direct. We don't need to support more, as direct simply gives us the entrypoint into the actual Camel Routes.
+ String camelUri = "direct://" + workItemCamelEndpointId;
+
+ try {
+ Exchange requestExchange = buildExchange(producerTemplate, workItem);
+ logger.debug("Sending Camel Exchange to: " + camelUri);
+ Exchange responseExchange = producerTemplate.send(camelUri, requestExchange);
+ // producerTemplate.send does not throw exceptions, instead they are set on the returned Exchange.
+ if (responseExchange.getException() != null) {
+ throw responseExchange.getException();
+ }
+ handleResponse(responseExchange, workItem, manager);
+ } catch (Exception e) {
+ handleException(e);
+ }
+ }
+
+ private String getCamelEndpointId(WorkItem workItem) {
+ String workItemCamelEndpointId = (String) workItem.getParameter(CAMEL_ENDPOINT_ID_PARAM);
+
+ if (camelEndpointId != null && !camelEndpointId.isEmpty()) {
+ if (workItemCamelEndpointId != null && !workItemCamelEndpointId.isEmpty()) {
+ logger.debug(
+ "The Camel Endpoint ID has been set on both the WorkItemHanlder and WorkItem. The camel-endpoint-id configured on the WorkItem overrides the global configuation.");
+ } else {
+ workItemCamelEndpointId = camelEndpointId;
+ }
+ }
+
+ if (workItemCamelEndpointId == null || workItemCamelEndpointId.isEmpty()) {
+ throw new IllegalArgumentException(
+ "No Camel Endpoint ID specified. Please configure the 'camel-endpoint-id' in either the constructor of this WorkItemHandler, or pass it via the "
+ + CAMEL_ENDPOINT_ID_PARAM + "' WorkItem parameter.");
+ }
+ return workItemCamelEndpointId;
+ }
+
+ protected abstract void handleResponse(Exchange responseExchange, WorkItem workItem, WorkItemManager manager);
+
+ protected abstract Exchange buildExchange(ProducerTemplate template, WorkItem workItem);
+
+ public void abortWorkItem(WorkItem workItem, WorkItemManager manager) {
+ // Do nothing, cannot be aborted
+ }
+
+ @Override
+ public void close() {
+ try {
+ this.producerTemplate.stop();
+ } catch (Exception e) {
+ logger.warn("Error encountered while closing the Camel Producer Template.", e);
+ // Not much we can do here, so swallowing exception.
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/CamelWorkItemHandler.java b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/CamelWorkItemHandler.java
deleted file mode 100644
index 7734dd9..0000000
--- a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/CamelWorkItemHandler.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.component.jbpm.workitem;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.ProducerTemplate;
-import org.apache.camel.builder.ExchangeBuilder;
-import org.jbpm.process.workitem.core.AbstractLogOrThrowWorkItemHandler;
-import org.jbpm.process.workitem.core.util.Wid;
-import org.jbpm.process.workitem.core.util.WidMavenDepends;
-import org.jbpm.process.workitem.core.util.WidParameter;
-import org.jbpm.process.workitem.core.util.WidResult;
-import org.jbpm.process.workitem.core.util.service.WidAction;
-import org.jbpm.process.workitem.core.util.service.WidService;
-import org.jbpm.services.api.service.ServiceRegistry;
-import org.kie.api.runtime.manager.RuntimeManager;
-import org.kie.api.runtime.process.WorkItem;
-import org.kie.api.runtime.process.WorkItemManager;
-import org.kie.internal.runtime.Cacheable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Camel jBPM {@link WorkItemHandler} which allows to call Camel routes with a <code>direct</code> endpoint.
- * <p/>
- * The handler passes the {@WorkItem} to the route that has a consumer on the endpoint-id that can be passed with the
- * <code>camel-endpoint-id</code>{@link WorkItem} parameter. E.g. when a the value "myCamelEndpoint" is passed to the
- * {link WorkItem} via the <code>camel-endpoint-id</code> parameter, this command will send the {@link WorkItem} to the Camel URI
- * <code>direct://myCamelEndpoint</code>.
- * <p/>
- * The body of the result {@link Message} of the invocation is returned via the <code>response</code> parameter. Access to the raw response
- * {@link Message} is provided via the <code>message</code> parameter. This gives the user access to more advanced fields like message
- * headers and attachments.
- * <p/>
- * This handler can be constructed in 2 ways. When using the default constructor, the handler will try to find the global KIE
- * {@link CamelContext} from the <code>jBPM</code> {@link ServiceRegistry}. When the {@link RuntimeManager} is passed to the constructor,
- * the handler will retrieve and use the {@link CamelContext} bound to the {@link RuntimeManage} from the {@link ServiceRegistry}
- *
- */
-@Wid(widfile = "CamelConnector.wid", name = "CamelConnector", displayName = "CamelConnector", defaultHandler = "mvel: new org.apache.camel.component.jbpm.workitem.CamelWorkitemHandler()", documentation = "${artifactId}/index.html", parameters = {
- @WidParameter(name = "camel-endpoint-id") }, results = { @WidResult(name = "response"),
- @WidResult(name = "message") }, mavenDepends = {
- @WidMavenDepends(group = "${groupId}", artifact = "${artifactId}", version = "${version}") }, serviceInfo = @WidService(category = "${name}", description = "${description}", keywords = "apache,camel,payload,route,connector", action = @WidAction(title = "Send payload to a Camel endpoint")))
-public class CamelWorkItemHandler extends AbstractLogOrThrowWorkItemHandler implements Cacheable {
-
- private static final String GLOBAL_CAMEL_CONTEXT_SERVICE_KEY = "GlobalCamelService";
- private static final String RUNTIME_CAMEL_CONTEXT_SERVICE_POSTFIX = "_CamelService";
-
- private static final String CAMEL_ENDPOINT_ID_PARAM = "camel-endpoint-id";
- private static final String RESPONSE_PARAM = "response";
- private static final String MESSAGE_PARAM = "message";
-
- private static Logger logger = LoggerFactory.getLogger(CamelWorkItemHandler.class);
-
- private final ProducerTemplate producerTemplate;
-
- /**
- * Default Constructor. This creates a {@link ProducerTemplate} for the global {@link CamelContext}.
- */
- public CamelWorkItemHandler() {
- CamelContext globalCamelContext = (CamelContext) ServiceRegistry.get().service(GLOBAL_CAMEL_CONTEXT_SERVICE_KEY);
- // TODO: Should we allow to set the maximumCacheSize on the producer?
- this.producerTemplate = globalCamelContext.createProducerTemplate();
- }
-
- /**
- * Constructor which accepts {@link RuntimeManager}. This causes this WorkItemHanlder to create a {@link ProducerTemplate} for the
- * runtime specific {@link CamelContext}.
- */
- public CamelWorkItemHandler(RuntimeManager runtimeManager) {
- String runtimeCamelContextKey = runtimeManager.getIdentifier() + RUNTIME_CAMEL_CONTEXT_SERVICE_POSTFIX;
- CamelContext runtimeCamelContext = (CamelContext) ServiceRegistry.get().service(runtimeCamelContextKey);
- // TODO: Should we allow to set the maximumCacheSize on the producer?
- this.producerTemplate = runtimeCamelContext.createProducerTemplate();
- }
-
- public void executeWorkItem(WorkItem workItem, final WorkItemManager manager) {
-
- String camelEndpointId = (String) workItem.getParameter(CAMEL_ENDPOINT_ID_PARAM);
-
- // We only support direct. We don't need to support more, as direct simply gives us the entrypoint into the actual Camel Routes.
- String camelUri = "direct://" + camelEndpointId;
- try {
- Exchange inExchange = ExchangeBuilder.anExchange(producerTemplate.getCamelContext()).withBody(workItem).build();
- Exchange outExchange = producerTemplate.send(camelUri, inExchange);
- // producerTemplate.send does not throw exceptions, instead they are set on the returned Exchange.
- if (outExchange.getException() != null) {
- throw outExchange.getException();
- }
- Message outMessage = outExchange.getOut();
-
- Map<String, Object> result = new HashMap<>();
- Object response = outMessage.getBody();
- result.put(RESPONSE_PARAM, response);
- result.put(MESSAGE_PARAM, outMessage);
-
- manager.completeWorkItem(workItem.getId(), result);
- } catch (Exception e) {
- handleException(e);
- }
- }
-
- public void abortWorkItem(WorkItem workItem, WorkItemManager manager) {
- // Do nothing, cannot be aborted
- }
-
- @Override
- public void close() {
- try {
- this.producerTemplate.stop();
- } catch (Exception e) {
- logger.warn("Error encountered while closing the Camel Producer Template.", e);
- // Not much we can do here, so swallowing exception.
- }
- }
-
-}
\ No newline at end of file
diff --git a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/DeploymentContextCamelCommand.java b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/DeploymentContextCamelCommand.java
new file mode 100644
index 0000000..e3d3fd1
--- /dev/null
+++ b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/DeploymentContextCamelCommand.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.jbpm.workitem;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ProducerTemplate;
+import org.jbpm.services.api.service.ServiceRegistry;
+import org.kie.api.executor.CommandContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * CamelCommand that uses the {@link CamelContext} registered on the {@link ServiceRegistry} for this specific deployment.
+ */
+public class DeploymentContextCamelCommand extends AbstractCamelCommand {
+
+ private static final String RUNTIME_CAMEL_CONTEXT_SERVICE_POSTFIX = "_CamelService";
+
+ private final Map<String, ProducerTemplate> templates = new ConcurrentHashMap<>();
+
+ private static final Logger logger = LoggerFactory.getLogger(DeploymentContextCamelCommand.class);
+
+ @Override
+ protected ProducerTemplate getProducerTemplate(CommandContext ctx) {
+ String deploymentId = (String) ctx.getData("deploymentId");
+ ProducerTemplate template = templates.get(deploymentId);
+
+ if (template == null) {
+ synchronized (this) {
+ template = templates.get(deploymentId);
+ if (template == null) {
+ CamelContext deploymentCamelContext = (CamelContext) ServiceRegistry.get()
+ .service(deploymentId + RUNTIME_CAMEL_CONTEXT_SERVICE_POSTFIX);
+ template = deploymentCamelContext.createProducerTemplate();
+ templates.put(deploymentId, template);
+ }
+ }
+ }
+ return template;
+ }
+
+ @Override
+ public void close() {
+ for (ProducerTemplate nextTemplate : templates.values()) {
+ try {
+ nextTemplate.stop();
+ } catch (Exception e) {
+ logger.warn("Error encountered while closing the Camel Producer Template.", e);
+ // Not much we can do here, so swallowing exception.
+ }
+ }
+
+ }
+
+}
diff --git a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/GlobalContextCamelCommand.java b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/GlobalContextCamelCommand.java
new file mode 100644
index 0000000..974b896
--- /dev/null
+++ b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/GlobalContextCamelCommand.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.jbpm.workitem;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ProducerTemplate;
+import org.jbpm.services.api.service.ServiceRegistry;
+import org.kie.api.executor.CommandContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * CamelCommand that uses the global {@link CamelContext} registered on the {@link ServiceRegistry}.
+ */
+public class GlobalContextCamelCommand extends AbstractCamelCommand {
+
+ private static final String GLOBAL_CAMEL_CONTEXT_SERVICE_KEY = "GlobalCamelService";
+
+ private final ProducerTemplate globalContextProducerTemplate;
+
+ private static final Logger logger = LoggerFactory.getLogger(GlobalContextCamelCommand.class);
+
+ public GlobalContextCamelCommand() {
+ CamelContext globalCamelContext = (CamelContext) ServiceRegistry.get().service(GLOBAL_CAMEL_CONTEXT_SERVICE_KEY);
+ // TODO: Should we allow to set the maximumCacheSize on the producer?
+ this.globalContextProducerTemplate = globalCamelContext.createProducerTemplate();
+ }
+
+ @Override
+ protected ProducerTemplate getProducerTemplate(CommandContext ctx) {
+ return globalContextProducerTemplate;
+ }
+
+ @Override
+ public void close() {
+ try {
+ this.globalContextProducerTemplate.stop();
+ } catch (Exception e) {
+ logger.warn("Error encountered while closing the Camel Producer Template.", e);
+ // Not much we can do here, so swallowing exception.
+ }
+ }
+
+
+}
diff --git a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/InOnlyCamelWorkItemHandler.java b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/InOnlyCamelWorkItemHandler.java
new file mode 100644
index 0000000..0588c02
--- /dev/null
+++ b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/InOnlyCamelWorkItemHandler.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.jbpm.workitem;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.ExchangeBuilder;
+import org.drools.core.process.instance.impl.WorkItemImpl;
+import org.jbpm.process.workitem.core.util.Wid;
+import org.jbpm.process.workitem.core.util.WidMavenDepends;
+import org.jbpm.process.workitem.core.util.WidParameter;
+import org.jbpm.process.workitem.core.util.WidResult;
+import org.jbpm.process.workitem.core.util.service.WidAction;
+import org.jbpm.process.workitem.core.util.service.WidService;
+import org.kie.api.runtime.manager.RuntimeManager;
+import org.kie.api.runtime.process.WorkItem;
+import org.kie.api.runtime.process.WorkItemHandler;
+import org.kie.api.runtime.process.WorkItemManager;
+
+/**
+ * Camel jBPM {@link WorkItemHandler} that sends {@link Exchange Exchanges} with an <code>InOnly</code> Message Exchange Pattern.
+ * <p/>
+ * This handler does <b>NOT<b/> complete the {@link WorkItem}, and will not parse any response from the Camel route, other than possible exceptions.
+ * The use-case for this handler is asynchronous, one-way, communication, where an external party is responsible for completing the
+ * {@link WorkItem} at a later point in time.
+ * <p/>
+ * The handler creates a Camel Exchange and sets the {@link WorkItem} as the body of the {@link Message}. Furthermore, the following message
+ * headers are set:
+ * <ul>
+ * <li>deploymentId</li>
+ * <li>processInstanceId</li>
+ * <li>workItemId</li>
+ * </ul>
+ */
+@Wid(
+ widfile = "InOnlyCamelConnector.wid",
+ name = "InOnlyCamelConnector",
+ displayName = "InOnlyCamelConnector",
+ defaultHandler = "mvel: new org.apache.camel.component.jbpm.workitem.InOnlyCamelWorkitemHandler()",
+ documentation = "${artifactId}/index.html",
+ parameters = {
+ @WidParameter(name = "camel-endpoint-id")
+ },
+ results = {
+ @WidResult(name = "response"),
+ @WidResult(name = "message") },
+ mavenDepends = {
+ @WidMavenDepends(group = "${groupId}",
+ artifact = "${artifactId}",
+ version = "${version}")
+ },
+ serviceInfo = @WidService(category = "${name}",
+ description = "${description}",
+ keywords = "apache,camel,payload,route,connector",
+ action = @WidAction(title = "Send payload to a Camel endpoint")))
+public class InOnlyCamelWorkItemHandler extends AbstractCamelWorkItemHandler {
+
+ public InOnlyCamelWorkItemHandler() {
+ super();
+ }
+
+ public InOnlyCamelWorkItemHandler(String camelEndpointId) {
+ super(camelEndpointId);
+ }
+
+ public InOnlyCamelWorkItemHandler(RuntimeManager runtimeManager) {
+ super(runtimeManager);
+ }
+
+ public InOnlyCamelWorkItemHandler(RuntimeManager runtimeManager, String camelEndpointId) {
+ super(runtimeManager, camelEndpointId);
+ }
+
+ @Override
+ protected void handleResponse(Exchange responseExchange, WorkItem workItem, WorkItemManager manager) {
+ // no-op. There is no response for InOnly, so need to handle anything
+ }
+
+ @Override
+ protected Exchange buildExchange(ProducerTemplate template, WorkItem workItem) {
+ return ExchangeBuilder.anExchange(template.getCamelContext())
+ .withPattern(ExchangePattern.InOnly)
+ .withHeader("deploymentId", ((WorkItemImpl) workItem).getDeploymentId())
+ .withHeader("processInstanceId", workItem.getProcessInstanceId())
+ .withHeader("workItemId", workItem.getId())
+ .withBody(workItem).build();
+ }
+
+}
diff --git a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/InOutCamelWorkItemHandler.java b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/InOutCamelWorkItemHandler.java
new file mode 100644
index 0000000..d53c882
--- /dev/null
+++ b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/workitem/InOutCamelWorkItemHandler.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.jbpm.workitem;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.ExchangeBuilder;
+import org.drools.core.process.instance.impl.WorkItemImpl;
+import org.jbpm.process.workitem.core.util.Wid;
+import org.jbpm.process.workitem.core.util.WidMavenDepends;
+import org.jbpm.process.workitem.core.util.WidParameter;
+import org.jbpm.process.workitem.core.util.WidResult;
+import org.jbpm.process.workitem.core.util.service.WidAction;
+import org.jbpm.process.workitem.core.util.service.WidService;
+import org.kie.api.runtime.manager.RuntimeManager;
+import org.kie.api.runtime.process.WorkItem;
+import org.kie.api.runtime.process.WorkItemHandler;
+import org.kie.api.runtime.process.WorkItemManager;
+
+/**
+ * Camel jBPM {@link WorkItemHandler} that sends {@link Exchange Exchanges} with an <code>InOut</code> Message Exchange Pattern.
+ * <p/>
+ * This handler parses the response message from the given Camel route and completes the {@link WorkItem}. The use-case for this handler is
+ * synchronous, request-response style, communication.
+ * <p/>
+ * The handler creates a Camel Exchange and sets the {@link WorkItem} as the body of the {@link Message}. Furthermore, the following message
+ * headers are set:
+ * <ul>
+ * <li>deploymentId</li>
+ * <li>processInstanceId</li>
+ * <li>workItemId</li>
+ * </ul>
+ */
+@Wid(
+ widfile = "InOutCamelConnector.wid",
+ name = "InOutCamelConnector",
+ displayName = "InOutCamelConnector",
+ defaultHandler = "mvel: new org.apache.camel.component.jbpm.workitem.InOutCamelWorkitemHandler()",
+ documentation = "${artifactId}/index.html",
+ parameters = {
+ @WidParameter(name = "camel-endpoint-id")
+ },
+ results = {
+ @WidResult(name = "response"),
+ @WidResult(name = "message") },
+ mavenDepends = {
+ @WidMavenDepends(group = "${groupId}",
+ artifact = "${artifactId}",
+ version = "${version}")
+ },
+ serviceInfo = @WidService(category = "${name}",
+ description = "${description}",
+ keywords = "apache,camel,payload,route,connector",
+ action = @WidAction(title = "Send payload to a Camel endpoint")))
+public class InOutCamelWorkItemHandler extends AbstractCamelWorkItemHandler {
+
+ private static final String RESPONSE_PARAM = "response";
+ private static final String MESSAGE_PARAM = "message";
+
+ public InOutCamelWorkItemHandler() {
+ super();
+ }
+
+ public InOutCamelWorkItemHandler(String camelEndpointId) {
+ super(camelEndpointId);
+ }
+
+ public InOutCamelWorkItemHandler(RuntimeManager runtimeManager) {
+ super(runtimeManager);
+ }
+
+ public InOutCamelWorkItemHandler(RuntimeManager runtimeManager, String camelEndpointId) {
+ super(runtimeManager, camelEndpointId);
+ }
+
+ @Override
+ protected void handleResponse(Exchange responseExchange, WorkItem workItem, WorkItemManager manager) {
+ Message outMessage = responseExchange.getOut();
+
+ Map<String, Object> result = new HashMap<>();
+ Object response = outMessage.getBody();
+ result.put(RESPONSE_PARAM, response);
+ result.put(MESSAGE_PARAM, outMessage);
+
+ manager.completeWorkItem(workItem.getId(), result);
+ }
+
+ @Override
+ protected Exchange buildExchange(ProducerTemplate template, WorkItem workItem) {
+ return ExchangeBuilder.anExchange(template.getCamelContext())
+ .withPattern(ExchangePattern.InOut)
+ .withHeader("deploymentId", ((WorkItemImpl) workItem).getDeploymentId())
+ .withHeader("processInstanceId", workItem.getProcessInstanceId())
+ .withHeader("workItemId", workItem.getId())
+ .withBody(workItem)
+ .build();
+ }
+
+}
diff --git a/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/CamelWorkItemHandlerIntegrationTests.java b/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/CamelWorkItemHandlerIntegrationTests.java
new file mode 100644
index 0000000..39fd861
--- /dev/null
+++ b/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/CamelWorkItemHandlerIntegrationTests.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.jbpm.workitem;
+
+import static org.hamcrest.CoreMatchers.*;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.drools.core.process.instance.impl.WorkItemImpl;
+import org.jbpm.bpmn2.handler.WorkItemHandlerRuntimeException;
+import org.jbpm.process.workitem.core.TestWorkItemManager;
+import org.jbpm.services.api.service.ServiceRegistry;
+import org.junit.Test;
+import org.kie.api.runtime.process.WorkItemHandler;
+
+//http://camel.apache.org/using-getin-or-getout-methods-on-exchange.html
+//http://camel.apache.org/async.html
+public class CamelWorkItemHandlerIntegrationTests extends CamelTestSupport {
+
+ @EndpointInject(uri = "mock:result")
+ protected MockEndpoint resultEndpoint;
+
+ @Produce(uri = "direct:start")
+ protected ProducerTemplate template;
+
+ @Test
+ public void testSyncInOnly() throws Exception {
+ // Setup
+ String routeId = "testSyncInOnlyExceptionRoute";
+ RouteBuilder builder = new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start").routeId(routeId)
+ .setBody(simple("${body.getParameter(\"request\")}"))
+ .to("mock:result");
+ }
+ };
+ context.addRoutes(builder);
+ try {
+ // Register the Camel Context with the jBPM ServiceRegistry.
+ ServiceRegistry.get().register("GlobalCamelService", context);
+
+ // Test
+ String expectedBody = "helloRequest";
+ resultEndpoint.expectedBodiesReceived(expectedBody);
+
+ WorkItemImpl workItem = new WorkItemImpl();
+ workItem.setParameter("camel-endpoint-id", "start");
+ workItem.setParameter("request", expectedBody);
+
+ TestWorkItemManager manager = new TestWorkItemManager();
+
+ WorkItemHandler handler = new InOnlyCamelWorkItemHandler();
+
+ handler.executeWorkItem(workItem, manager);
+
+ // Assertions
+ assertThat(manager.getResults().size(), equalTo(0));
+ resultEndpoint.assertIsSatisfied();
+ } finally {
+ // Cleanup
+ context.removeRoute(routeId);
+ }
+ }
+
+ @Test(expected = WorkItemHandlerRuntimeException.class)
+ public void testSyncInOnlyException() throws Exception {
+ // Setup
+ String routeId = "testSyncInOnlyExceptionRoute";
+ RouteBuilder builder = new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start").routeId(routeId)
+ .setBody(simple("${body.getParameter(\"request\")}"))
+ .throwException(new IllegalArgumentException("Illegal contennt!"))
+ .to("mock:result");
+ }
+ };
+ context.addRoutes(builder);
+ try {
+ // Register the Camel Context with the jBPM ServiceRegistry.
+ ServiceRegistry.get().register("GlobalCamelService", context);
+
+ // Test
+ String expectedBody = "helloRequest";
+ resultEndpoint.expectedBodiesReceived(expectedBody);
+
+ WorkItemImpl workItem = new WorkItemImpl();
+ workItem.setParameter("camel-endpoint-id", "start");
+ workItem.setParameter("request", expectedBody);
+
+ TestWorkItemManager manager = new TestWorkItemManager();
+
+ WorkItemHandler handler = new InOnlyCamelWorkItemHandler();
+
+ handler.executeWorkItem(workItem, manager);
+
+ // Assertions
+ assertThat(manager.getResults().size(), equalTo(0));
+ resultEndpoint.assertIsSatisfied();
+ } finally {
+ // Cleanup
+ context.removeRoute(routeId);
+ }
+ }
+
+ @Test
+ public void testSyncInOut() throws Exception {
+ // Setup
+ String routeId = "testSyncInOnlyExceptionRoute";
+ RouteBuilder builder = new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start").routeId(routeId)
+ .setBody(simple("${body.getParameter(\"request\")}"))
+ .to("mock:result");
+ }
+ };
+ context.addRoutes(builder);
+ try {
+ // Register the Camel Context with the jBPM ServiceRegistry.
+ ServiceRegistry.get().register("GlobalCamelService", context);
+
+ // Test
+ String expectedBody = "helloRequest";
+ resultEndpoint.expectedBodiesReceived(expectedBody);
+
+ WorkItemImpl workItem = new WorkItemImpl();
+ workItem.setParameter("camel-endpoint-id", "start");
+ workItem.setParameter("request", expectedBody);
+
+ TestWorkItemManager manager = new TestWorkItemManager();
+
+ AbstractCamelWorkItemHandler handler = new InOutCamelWorkItemHandler();
+
+ handler.executeWorkItem(workItem, manager);
+
+ // Assertions
+ assertThat(manager.getResults().size(), equalTo(1));
+ resultEndpoint.assertIsSatisfied();
+ } finally {
+ // Cleanup
+ context.removeRoute(routeId);
+ }
+
+ }
+
+ @Test(expected = WorkItemHandlerRuntimeException.class)
+ public void testSyncInOutException() throws Exception {
+ // Setup
+ String routeId = "testSyncInOutExceptionRoute";
+ RouteBuilder builder = new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start").routeId(routeId)
+ .setBody(simple("${body.getParameter(\"request\")}"))
+ .throwException(new IllegalArgumentException("Illegal contennt!"))
+ .to("mock:result");
+ }
+ };
+ context.addRoutes(builder);
+ try {
+ // Register the Camel Context with the jBPM ServiceRegistry.
+ ServiceRegistry.get().register("GlobalCamelService", context);
+
+ // Test
+ String expectedBody = "helloRequest";
+ resultEndpoint.expectedBodiesReceived(expectedBody);
+
+ WorkItemImpl workItem = new WorkItemImpl();
+ workItem.setParameter("camel-endpoint-id", "start");
+ workItem.setParameter("request", expectedBody);
+
+ TestWorkItemManager manager = new TestWorkItemManager();
+
+ WorkItemHandler handler = new InOutCamelWorkItemHandler();
+
+ handler.executeWorkItem(workItem, manager);
+ } finally {
+ // Cleanup
+ context.removeRoute(routeId);
+ }
+ }
+
+}
diff --git a/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/CamelWorkItemHandlerTest.java b/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/CamelWorkItemHandlerTest.java
deleted file mode 100644
index 7872132..0000000
--- a/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/CamelWorkItemHandlerTest.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Copyright 2018 Red Hat, Inc. and/or its affiliates.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.component.jbpm.workitem;
-
-import static org.junit.Assert.*;
-import static org.mockito.ArgumentMatchers.*;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.*;
-
-import java.util.Map;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.ProducerTemplate;
-import org.drools.core.process.instance.impl.WorkItemImpl;
-import org.jbpm.process.workitem.core.TestWorkItemManager;
-import org.jbpm.services.api.service.ServiceRegistry;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.kie.api.runtime.manager.RuntimeManager;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class CamelWorkItemHandlerTest {
-
- @Mock
- ProducerTemplate producerTemplate;
-
- @Mock
- Exchange outExchange;
-
- @Mock
- Message outMessage;
-
- @Mock
- CamelContext camelContext;
-
- @Mock
- RuntimeManager runtimeManager;
-
- @Test
- public void testExecuteGlobalCamelContext() throws Exception {
-
- String camelEndpointId = "testCamelRoute";
- String camelRouteUri = "direct://" + camelEndpointId;
-
- String testReponse = "testResponse";
-
- when(producerTemplate.send(eq(camelRouteUri), any(Exchange.class))).thenReturn(outExchange);
- when(producerTemplate.getCamelContext()).thenReturn(camelContext);
-
- when(camelContext.createProducerTemplate()).thenReturn(producerTemplate);
-
- when(outExchange.getOut()).thenReturn(outMessage);
- when(outMessage.getBody()).thenReturn(testReponse);
-
- ServiceRegistry.get().register("GlobalCamelService", camelContext);
-
-
- TestWorkItemManager manager = new TestWorkItemManager();
- WorkItemImpl workItem = new WorkItemImpl();
- workItem.setParameter("camel-endpoint-id", camelEndpointId);
- workItem.setParameter("request", "someRequest");
-
- CamelWorkItemHandler handler = new CamelWorkItemHandler();
-
- handler.executeWorkItem(workItem,
- manager);
- assertNotNull(manager.getResults());
- assertEquals(1,
- manager.getResults().size());
- assertTrue(manager.getResults().containsKey(workItem.getId()));
- Map<String, Object> results = manager.getResults(workItem.getId());
- assertEquals(2, results.size());
- assertEquals(testReponse, results.get("response"));
- }
-
- @Test
- public void testExecuteLocalCamelContext() throws Exception {
-
- String camelEndpointId = "testCamelRoute";
- String camelRouteUri = "direct://" + camelEndpointId;
-
- String testReponse = "testResponse";
-
- String runtimeManagerId = "testRuntimeManager";
-
- when(runtimeManager.getIdentifier()).thenReturn(runtimeManagerId);
-
- when(producerTemplate.send(eq(camelRouteUri), any(Exchange.class))).thenReturn(outExchange);
- when(producerTemplate.getCamelContext()).thenReturn(camelContext);
-
- when(camelContext.createProducerTemplate()).thenReturn(producerTemplate);
-
- when(outExchange.getOut()).thenReturn(outMessage);
- when(outMessage.getBody()).thenReturn(testReponse);
-
- //Register the RuntimeManager bound camelcontext.
- ServiceRegistry.get().register(runtimeManagerId + "_CamelService", camelContext);
-
-
- WorkItemImpl workItem = new WorkItemImpl();
- workItem.setParameter("camel-endpoint-id", camelEndpointId);
- workItem.setParameter("request", "someRequest");
-
- CamelWorkItemHandler handler = new CamelWorkItemHandler(runtimeManager);
-
- TestWorkItemManager manager = new TestWorkItemManager();
- handler.executeWorkItem(workItem,
- manager);
- assertNotNull(manager.getResults());
- assertEquals(1,
- manager.getResults().size());
- assertTrue(manager.getResults().containsKey(workItem.getId()));
- Map<String, Object> results = manager.getResults(workItem.getId());
- assertEquals(2, results.size());
- assertEquals(testReponse, results.get("response"));
- }
-
-
-}
diff --git a/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/DeploymentContextCamelCommandTest.java b/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/DeploymentContextCamelCommandTest.java
new file mode 100644
index 0000000..b703456
--- /dev/null
+++ b/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/DeploymentContextCamelCommandTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.jbpm.workitem;
+
+import static org.junit.Assert.*;
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.*;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.ProducerTemplate;
+import org.drools.core.process.instance.impl.WorkItemImpl;
+import org.jbpm.services.api.service.ServiceRegistry;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.kie.api.executor.Command;
+import org.kie.api.executor.CommandContext;
+import org.kie.api.executor.ExecutionResults;
+import org.kie.api.runtime.manager.RuntimeManager;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DeploymentContextCamelCommandTest {
+
+ @Mock
+ ProducerTemplate producerTemplate;
+
+ @Mock
+ Exchange outExchange;
+
+ @Mock
+ Message outMessage;
+
+ @Mock
+ CamelContext camelContext;
+
+ @Mock
+ RuntimeManager runtimeManager;
+
+ @Mock
+ CommandContext commandContext;
+
+ @Test
+ public void testExecuteCommandDeploymentCamelContext() throws Exception {
+
+ String camelEndpointId = "testCamelRoute";
+ String camelRouteUri = "direct://" + camelEndpointId;
+
+ String testReponse = "testResponse";
+
+ String deploymentId = "testDeployment";
+
+ when(producerTemplate.send(eq(camelRouteUri), any(Exchange.class))).thenReturn(outExchange);
+ when(producerTemplate.getCamelContext()).thenReturn(camelContext);
+
+ when(camelContext.createProducerTemplate()).thenReturn(producerTemplate);
+
+ when(outExchange.getOut()).thenReturn(outMessage);
+ when(outMessage.getBody()).thenReturn(testReponse);
+
+ //Register the RuntimeManager bound camelcontext.
+ ServiceRegistry.get().register(deploymentId + "_CamelService", camelContext);
+
+ WorkItemImpl workItem = new WorkItemImpl();
+ workItem.setParameter("camel-endpoint-id", camelEndpointId);
+ workItem.setParameter("request", "someRequest");
+
+ when(commandContext.getData("workItem")).thenReturn(workItem);
+ when(commandContext.getData("deploymentId")).thenReturn(deploymentId);
+
+ Command command = new DeploymentContextCamelCommand();
+ ExecutionResults results = command.execute(commandContext);
+
+ assertNotNull(results);
+ assertEquals(2, results.getData().size());
+ assertEquals(testReponse, results.getData().get("response"));
+ }
+}
diff --git a/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/CamelCommandTest.java b/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/GlobalContextCamelCommandTest.java
similarity index 54%
rename from components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/CamelCommandTest.java
rename to components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/GlobalContextCamelCommandTest.java
index 543b833..ad36879 100644
--- a/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/CamelCommandTest.java
+++ b/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/GlobalContextCamelCommandTest.java
@@ -1,11 +1,12 @@
-/*
- * Copyright 2018 Red Hat, Inc. and/or its affiliates.
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -13,11 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.camel.component.jbpm.workitem;
import static org.junit.Assert.*;
+import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
-import static org.mockito.ArgumentMatchers.any;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
@@ -27,6 +29,7 @@ import org.drools.core.process.instance.impl.WorkItemImpl;
import org.jbpm.services.api.service.ServiceRegistry;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.kie.api.executor.Command;
import org.kie.api.executor.CommandContext;
import org.kie.api.executor.ExecutionResults;
import org.kie.api.runtime.manager.RuntimeManager;
@@ -34,7 +37,7 @@ import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
-public class CamelCommandTest {
+public class GlobalContextCamelCommandTest {
@Mock
ProducerTemplate producerTemplate;
@@ -55,7 +58,7 @@ public class CamelCommandTest {
CommandContext commandContext;
@Test
- public void testExecuteComamnd() throws Exception {
+ public void testExecuteGlobalCommand() throws Exception {
String camelEndpointId = "testCamelRoute";
String camelRouteUri = "direct://" + camelEndpointId;
@@ -83,7 +86,7 @@ public class CamelCommandTest {
when(commandContext.getData(anyString())).thenReturn(workItem);
- CamelCommand command = new CamelCommand();
+ Command command = new GlobalContextCamelCommand();
ExecutionResults results = command.execute(commandContext);
@@ -91,44 +94,4 @@ public class CamelCommandTest {
assertEquals(2, results.getData().size());
assertEquals(testReponse, results.getData().get("response"));
}
-
-
- @Test
- public void testExecuteCommandLocalCamelContext() throws Exception {
-
- String camelEndpointId = "testCamelRoute";
- String camelRouteUri = "direct://" + camelEndpointId;
-
- String testReponse = "testResponse";
-
- String runtimeManagerId = "testRuntimeManager";
-
- when(runtimeManager.getIdentifier()).thenReturn(runtimeManagerId);
-
- when(producerTemplate.send(eq(camelRouteUri), any(Exchange.class))).thenReturn(outExchange);
- //when(producerTemplate.send(argThat(not(camelRouteUri)), any(Exchange.class))).thenThrow(new IllegalArgumentException("Unexpected route id"));
- when(producerTemplate.getCamelContext()).thenReturn(camelContext);
-
- when(camelContext.createProducerTemplate()).thenReturn(producerTemplate);
-
- when(outExchange.getOut()).thenReturn(outMessage);
- when(outMessage.getBody()).thenReturn(testReponse);
-
- //Register the RuntimeManager bound camelcontext.
- ServiceRegistry.get().register(runtimeManagerId + "_CamelService", camelContext);
-
- WorkItemImpl workItem = new WorkItemImpl();
- workItem.setParameter("camel-endpoint-id", camelEndpointId);
- workItem.setParameter("request", "someRequest");
-
- when(commandContext.getData(anyString())).thenReturn(workItem);
-
- CamelCommand command = new CamelCommand(runtimeManager);
- ExecutionResults results = command.execute(commandContext);
-
- assertNotNull(results);
- assertEquals(2, results.getData().size());
- assertEquals(testReponse, results.getData().get("response"));
- }
-
}
diff --git a/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/InOnlyCamelWorkItemHandlerTest.java b/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/InOnlyCamelWorkItemHandlerTest.java
new file mode 100644
index 0000000..1ad1bf9
--- /dev/null
+++ b/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/InOnlyCamelWorkItemHandlerTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.jbpm.workitem;
+
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.*;
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.*;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.impl.DefaultHeadersMapFactory;
+import org.apache.camel.spi.HeadersMapFactory;
+import org.drools.core.process.instance.impl.WorkItemImpl;
+import org.jbpm.process.workitem.core.TestWorkItemManager;
+import org.jbpm.services.api.service.ServiceRegistry;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.kie.api.runtime.manager.RuntimeManager;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class InOnlyCamelWorkItemHandlerTest {
+
+ @Mock
+ ProducerTemplate producerTemplate;
+
+ @Mock
+ Exchange outExchange;
+
+ @Mock
+ Message outMessage;
+
+ @Mock
+ CamelContext camelContext;
+
+ @Mock
+ RuntimeManager runtimeManager;
+
+ @Test
+ public void testExecuteInOnlyLocalCamelContext() throws Exception {
+
+ String camelEndpointId = "testCamelRoute";
+ String camelRouteUri = "direct://" + camelEndpointId;
+
+ String testReponse = "testResponse";
+
+ String runtimeManagerId = "testRuntimeManager";
+
+ when(runtimeManager.getIdentifier()).thenReturn(runtimeManagerId);
+
+ when(producerTemplate.send(eq(camelRouteUri), ArgumentMatchers.any(Exchange.class))).thenReturn(outExchange);
+ when(producerTemplate.getCamelContext()).thenReturn(camelContext);
+
+ when(camelContext.createProducerTemplate()).thenReturn(producerTemplate);
+ HeadersMapFactory hmf = new DefaultHeadersMapFactory();
+ when(camelContext.getHeadersMapFactory()).thenReturn(hmf);
+
+ // Register the RuntimeManager bound camelcontext.
+ ServiceRegistry.get().register(runtimeManagerId + "_CamelService", camelContext);
+
+ WorkItemImpl workItem = new WorkItemImpl();
+ workItem.setParameter("camel-endpoint-id", camelEndpointId);
+ workItem.setParameter("request", "someRequest");
+ workItem.setDeploymentId("testDeploymentId");
+ workItem.setProcessInstanceId(1L);
+ workItem.setId(1L);
+
+ AbstractCamelWorkItemHandler handler = new InOnlyCamelWorkItemHandler(runtimeManager);
+
+ TestWorkItemManager manager = new TestWorkItemManager();
+ handler.executeWorkItem(workItem,
+ manager);
+ assertThat(manager.getResults(), is(notNullValue()));
+ //InOnly does not complete WorkItem.
+ assertThat(manager.getResults().size(), equalTo(0));
+ }
+}
diff --git a/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/InOutCamelWorkItemHandlerTest.java b/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/InOutCamelWorkItemHandlerTest.java
new file mode 100644
index 0000000..e4bc6ba
--- /dev/null
+++ b/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/InOutCamelWorkItemHandlerTest.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.jbpm.workitem;
+
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.*;
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.*;
+
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.impl.DefaultHeadersMapFactory;
+import org.apache.camel.spi.HeadersMapFactory;
+import org.drools.core.process.instance.impl.WorkItemImpl;
+import org.jbpm.process.workitem.core.TestWorkItemManager;
+import org.jbpm.services.api.service.ServiceRegistry;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.kie.api.runtime.manager.RuntimeManager;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class InOutCamelWorkItemHandlerTest {
+
+ @Mock
+ ProducerTemplate producerTemplate;
+
+ @Mock
+ Exchange outExchange;
+
+ @Mock
+ Message outMessage;
+
+ @Mock
+ CamelContext camelContext;
+
+ @Mock
+ RuntimeManager runtimeManager;
+
+ @Test
+ public void testExecuteInOutGlobalCamelContext() throws Exception {
+
+ String camelEndpointId = "testCamelRoute";
+ String camelRouteUri = "direct://" + camelEndpointId;
+
+ String testReponse = "testResponse";
+
+ when(producerTemplate.send(eq(camelRouteUri), ArgumentMatchers.any(Exchange.class))).thenReturn(outExchange);
+ when(producerTemplate.getCamelContext()).thenReturn(camelContext);
+
+ when(camelContext.createProducerTemplate()).thenReturn(producerTemplate);
+ HeadersMapFactory hmf = new DefaultHeadersMapFactory();
+ when(camelContext.getHeadersMapFactory()).thenReturn(hmf);
+
+ when(outExchange.getOut()).thenReturn(outMessage);
+ when(outMessage.getBody()).thenReturn(testReponse);
+
+ ServiceRegistry.get().register("GlobalCamelService", camelContext);
+
+ TestWorkItemManager manager = new TestWorkItemManager();
+ WorkItemImpl workItem = new WorkItemImpl();
+ workItem.setParameter("camel-endpoint-id", camelEndpointId);
+ workItem.setParameter("request", "someRequest");
+ workItem.setDeploymentId("testDeploymentId");
+ workItem.setProcessInstanceId(1L);
+ workItem.setId(1L);
+
+ AbstractCamelWorkItemHandler handler = new InOutCamelWorkItemHandler();
+
+ handler.executeWorkItem(workItem,
+ manager);
+ assertThat(manager.getResults(), is(notNullValue()));
+ assertThat(manager.getResults().size(), equalTo(1));
+ assertThat(manager.getResults().containsKey(workItem.getId()), is(true));
+ Map<String, Object> results = manager.getResults(workItem.getId());
+ assertThat(results.size(), equalTo(2));
+ assertThat(results.get("response"), equalTo(testReponse));
+ }
+
+ @Test
+ public void testExecuteInOutLocalCamelContext() throws Exception {
+
+ String camelEndpointId = "testCamelRoute";
+ String camelRouteUri = "direct://" + camelEndpointId;
+
+ String testReponse = "testResponse";
+
+ String runtimeManagerId = "testRuntimeManager";
+
+ when(runtimeManager.getIdentifier()).thenReturn(runtimeManagerId);
+
+ when(producerTemplate.send(eq(camelRouteUri), ArgumentMatchers.any(Exchange.class))).thenReturn(outExchange);
+ when(producerTemplate.getCamelContext()).thenReturn(camelContext);
+
+ when(camelContext.createProducerTemplate()).thenReturn(producerTemplate);
+ HeadersMapFactory hmf = new DefaultHeadersMapFactory();
+ when(camelContext.getHeadersMapFactory()).thenReturn(hmf);
+
+ when(outExchange.getOut()).thenReturn(outMessage);
+ when(outMessage.getBody()).thenReturn(testReponse);
+
+ // Register the RuntimeManager bound camelcontext.
+ ServiceRegistry.get().register(runtimeManagerId + "_CamelService", camelContext);
+
+ WorkItemImpl workItem = new WorkItemImpl();
+ workItem.setParameter("camel-endpoint-id", camelEndpointId);
+ workItem.setParameter("request", "someRequest");
+ workItem.setDeploymentId("testDeploymentId");
+ workItem.setProcessInstanceId(1L);
+ workItem.setId(1L);
+
+ AbstractCamelWorkItemHandler handler = new InOutCamelWorkItemHandler(runtimeManager);
+
+ TestWorkItemManager manager = new TestWorkItemManager();
+ handler.executeWorkItem(workItem,
+ manager);
+ assertThat(manager.getResults(), is(notNullValue()));
+ assertThat(manager.getResults().size(), equalTo(1));
+ assertThat(manager.getResults().containsKey(workItem.getId()), is(true));
+
+ Map<String, Object> results = manager.getResults(workItem.getId());
+ assertThat(results.size(), equalTo(2));
+ assertThat(results.get("response"), equalTo(testReponse));
+ }
+
+}