You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2013/12/09 03:11:36 UTC
[32/50] [abbrv] git commit: New component for Amazon Simple Workflow
Service
New component for Amazon Simple Workflow Service
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1c1d3a88
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1c1d3a88
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1c1d3a88
Branch: refs/heads/camel-gora
Commit: 1c1d3a883202e6de89ba9eee9bdedcc55cf15584
Parents: 88a5104
Author: Bilgin Ibryam <bi...@apache.com>
Authored: Tue Dec 3 23:49:55 2013 +0000
Committer: Bilgin Ibryam <bi...@apache.com>
Committed: Tue Dec 3 23:49:55 2013 +0000
----------------------------------------------------------------------
components/camel-aws/pom.xml | 3 +-
.../aws/swf/CamelActivityImplementation.java | 102 ++++++++
.../swf/CamelActivityImplementationFactory.java | 61 +++++
.../aws/swf/CamelSWFActivityClient.java | 66 ++++++
.../aws/swf/CamelSWFWorkflowClient.java | 119 ++++++++++
.../aws/swf/CamelWorkflowDefinition.java | 139 +++++++++++
.../aws/swf/CamelWorkflowDefinitionFactory.java | 58 +++++
.../CamelWorkflowDefinitionFactoryFactory.java | 52 ++++
.../component/aws/swf/SWFActivityConsumer.java | 69 ++++++
.../component/aws/swf/SWFActivityProducer.java | 61 +++++
.../camel/component/aws/swf/SWFComponent.java | 44 ++++
.../component/aws/swf/SWFConfiguration.java | 236 +++++++++++++++++++
.../camel/component/aws/swf/SWFConstants.java | 37 +++
.../camel/component/aws/swf/SWFEndpoint.java | 141 +++++++++++
.../component/aws/swf/SWFWorkflowConsumer.java | 87 +++++++
.../component/aws/swf/SWFWorkflowProducer.java | 170 +++++++++++++
.../services/org/apache/camel/component/aws-swf | 1 +
.../component/aws/swf/AmazonSWFClientMock.java | 32 +++
.../aws/swf/CamelSWFActivityClientTest.java | 51 ++++
.../aws/swf/CamelSWFActivityConsumerTest.java | 68 ++++++
.../component/aws/swf/CamelSWFTestSupport.java | 54 +++++
.../aws/swf/CamelSWFWorkflowClientTest.java | 143 +++++++++++
.../aws/swf/CamelSWFWorkflowConsumerTest.java | 66 ++++++
.../aws/swf/CamelSWFWorkflowProducerTest.java | 60 +++++
.../aws/swf/SwfComponentSpringTest.java | 59 +++++
.../swf/integration/CamelSWFEndToEndTest.java | 90 +++++++
.../aws/swf/SwfComponentSpringTest-context.xml | 32 +++
27 files changed, 2100 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/1c1d3a88/components/camel-aws/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-aws/pom.xml b/components/camel-aws/pom.xml
index 9440099..f05536a 100644
--- a/components/camel-aws/pom.xml
+++ b/components/camel-aws/pom.xml
@@ -40,7 +40,8 @@
org.apache.camel.spi.ComponentResolver;component=aws-sdb,
org.apache.camel.spi.ComponentResolver;component=aws-ses,
org.apache.camel.spi.ComponentResolver;component=aws-sns,
- org.apache.camel.spi.ComponentResolver;component=aws-sqs
+ org.apache.camel.spi.ComponentResolver;component=aws-sqs,
+ org.apache.camel.spi.ComponentResolver;component=aws-swf
</camel.osgi.export.service>
</properties>
http://git-wip-us.apache.org/repos/asf/camel/blob/1c1d3a88/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/CamelActivityImplementation.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/CamelActivityImplementation.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/CamelActivityImplementation.java
new file mode 100644
index 0000000..042f2b8
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/CamelActivityImplementation.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.swf;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.concurrent.CancellationException;
+
+import com.amazonaws.services.simpleworkflow.flow.ActivityExecutionContext;
+import com.amazonaws.services.simpleworkflow.flow.ActivityExecutionContextProviderImpl;
+import com.amazonaws.services.simpleworkflow.flow.ActivityFailureException;
+import com.amazonaws.services.simpleworkflow.flow.DataConverter;
+import com.amazonaws.services.simpleworkflow.flow.DataConverterException;
+import com.amazonaws.services.simpleworkflow.flow.common.WorkflowExecutionUtils;
+import com.amazonaws.services.simpleworkflow.flow.generic.ActivityImplementationBase;
+import com.amazonaws.services.simpleworkflow.flow.worker.ActivityTypeExecutionOptions;
+import com.amazonaws.services.simpleworkflow.flow.worker.ActivityTypeRegistrationOptions;
+import com.amazonaws.services.simpleworkflow.flow.worker.CurrentActivityExecutionContext;
+
+public class CamelActivityImplementation extends ActivityImplementationBase {
+ private final ActivityTypeExecutionOptions executionOptions;
+ private final ActivityTypeRegistrationOptions registrationOptions;
+ private final DataConverter converter;
+ private final SWFActivityConsumer swfWorkflowConsumer;
+ private final ActivityExecutionContextProviderImpl contextProvider = new ActivityExecutionContextProviderImpl();
+
+ public CamelActivityImplementation(SWFActivityConsumer swfWorkflowConsumer, ActivityTypeRegistrationOptions activityTypeRegistrationOptions,
+ ActivityTypeExecutionOptions activityTypeExecutionOptions, DataConverter converter) {
+ this.swfWorkflowConsumer = swfWorkflowConsumer;
+ this.registrationOptions = activityTypeRegistrationOptions;
+ this.executionOptions = activityTypeExecutionOptions;
+ this.converter = converter;
+ }
+
+ @Override
+ public ActivityTypeRegistrationOptions getRegistrationOptions() {
+ return registrationOptions;
+ }
+
+ @Override
+ public ActivityTypeExecutionOptions getExecutionOptions() {
+ return executionOptions;
+ }
+
+ @Override
+ protected String execute(String input, ActivityExecutionContext context) throws ActivityFailureException, CancellationException {
+ Object[] inputParameters = converter.fromData(input, Object[].class);
+ CurrentActivityExecutionContext.set(context);
+ Object result = null;
+
+ ActivityExecutionContext executionContext = contextProvider.getActivityExecutionContext();
+ String taskToken = executionContext.getTaskToken();
+
+ try {
+ result = swfWorkflowConsumer.processActivity(inputParameters, taskToken);
+ } catch (InvocationTargetException invocationException) {
+ throwActivityFailureException(invocationException.getTargetException() != null ? invocationException.getTargetException() : invocationException);
+ } catch (IllegalArgumentException illegalArgumentException) {
+ throwActivityFailureException(illegalArgumentException);
+ } catch (IllegalAccessException illegalAccessException) {
+ throwActivityFailureException(illegalAccessException);
+ } catch (Exception e) {
+ throwActivityFailureException(e);
+ } finally {
+ CurrentActivityExecutionContext.unset();
+ }
+ String resultSerialized = converter.toData(result);
+ return resultSerialized;
+ }
+
+ void throwActivityFailureException(Throwable exception) throws ActivityFailureException, CancellationException {
+ if (exception instanceof CancellationException) {
+ throw (CancellationException) exception;
+ }
+
+ String reason = WorkflowExecutionUtils.truncateReason(exception.getMessage());
+ String details = null;
+ try {
+ details = converter.toData(exception);
+ } catch (DataConverterException dataConverterException) {
+ if (dataConverterException.getCause() == null) {
+ dataConverterException.initCause(exception);
+ }
+ throw dataConverterException;
+ }
+
+ throw new ActivityFailureException(reason, details);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/1c1d3a88/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/CamelActivityImplementationFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/CamelActivityImplementationFactory.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/CamelActivityImplementationFactory.java
new file mode 100644
index 0000000..15a487e
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/CamelActivityImplementationFactory.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.swf;
+
+import java.util.ArrayList;
+
+import com.amazonaws.services.simpleworkflow.flow.DataConverter;
+import com.amazonaws.services.simpleworkflow.flow.JsonDataConverter;
+import com.amazonaws.services.simpleworkflow.flow.generic.ActivityImplementation;
+import com.amazonaws.services.simpleworkflow.flow.generic.ActivityImplementationFactory;
+import com.amazonaws.services.simpleworkflow.flow.worker.ActivityTypeExecutionOptions;
+import com.amazonaws.services.simpleworkflow.flow.worker.ActivityTypeRegistrationOptions;
+import com.amazonaws.services.simpleworkflow.model.ActivityType;
+
+public class CamelActivityImplementationFactory extends ActivityImplementationFactory {
+ private SWFActivityConsumer swfWorkflowConsumer;
+ private SWFConfiguration configuration;
+
+ public CamelActivityImplementationFactory(SWFActivityConsumer swfWorkflowConsumer, SWFConfiguration configuration) {
+ this.swfWorkflowConsumer = swfWorkflowConsumer;
+ this.configuration = configuration;
+ }
+
+ @Override
+ public Iterable<ActivityType> getActivityTypesToRegister() {
+ ArrayList<ActivityType> activityTypes = new ArrayList<ActivityType>(1);
+ ActivityType activityType = new ActivityType();
+ activityType.setName(configuration.getEventName());
+ activityType.setVersion(configuration.getVersion());
+ activityTypes.add(activityType);
+ return activityTypes;
+ }
+
+ @Override
+ public ActivityImplementation getActivityImplementation(ActivityType activityType) {
+ ActivityTypeExecutionOptions activityTypeExecutionOptions = configuration.getActivityTypeExecutionOptions() != null
+ ? configuration.getActivityTypeExecutionOptions() : new ActivityTypeExecutionOptions();
+
+ ActivityTypeRegistrationOptions activityTypeRegistrationOptions = configuration.getActivityTypeRegistrationOptions() != null
+ ? configuration.getActivityTypeRegistrationOptions() : new ActivityTypeRegistrationOptions();
+
+ DataConverter dataConverter = configuration.getDataConverter() != null
+ ? configuration.getDataConverter() : new JsonDataConverter();
+
+ return new CamelActivityImplementation(swfWorkflowConsumer, activityTypeRegistrationOptions, activityTypeExecutionOptions, dataConverter);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1c1d3a88/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/CamelSWFActivityClient.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/CamelSWFActivityClient.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/CamelSWFActivityClient.java
new file mode 100644
index 0000000..f75b7de
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/CamelSWFActivityClient.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.swf;
+
+import com.amazonaws.services.simpleworkflow.flow.DynamicActivitiesClient;
+import com.amazonaws.services.simpleworkflow.flow.DynamicActivitiesClientImpl;
+import com.amazonaws.services.simpleworkflow.flow.core.Promise;
+import com.amazonaws.services.simpleworkflow.model.ActivityType;
+
+public class CamelSWFActivityClient {
+ private final DynamicActivitiesClient dynamicActivitiesClient;
+ private SWFConfiguration configuration;
+
+ public CamelSWFActivityClient(SWFConfiguration configuration) {
+ this.configuration = configuration;
+ dynamicActivitiesClient = getDynamicActivitiesClient();
+ }
+
+ public Object scheduleActivity(String eventName, String version, Object input) {
+ ActivityType activity = new ActivityType();
+ activity.setName(eventName);
+ activity.setVersion(version);
+
+ Promise[] promises = asPromiseArray(input);
+ Promise<?> promise = dynamicActivitiesClient.scheduleActivity(activity, promises, configuration.getActivitySchedulingOptions(), Object.class, null);
+ return promise;
+ }
+
+ protected Promise[] asPromiseArray(Object input) {
+ Promise[] promises;
+ if (input instanceof Object[]) {
+ Object[] inputArray = (Object[])input;
+ promises = new Promise[inputArray.length];
+ for (int i = 0; i < inputArray.length; i++) {
+ promises[i] = Promise.asPromise(inputArray[i]);
+ }
+ } else {
+ promises = new Promise[1];
+ if (input instanceof Promise) {
+ promises[0] = (Promise) input;
+ } else {
+ promises[0] = Promise.asPromise(input);
+ }
+ }
+ return promises;
+ }
+
+ DynamicActivitiesClient getDynamicActivitiesClient() {
+ return new DynamicActivitiesClientImpl();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1c1d3a88/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/CamelSWFWorkflowClient.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/CamelSWFWorkflowClient.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/CamelSWFWorkflowClient.java
new file mode 100644
index 0000000..f4ce393
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/CamelSWFWorkflowClient.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.swf;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.amazonaws.services.simpleworkflow.flow.DynamicWorkflowClientExternal;
+import com.amazonaws.services.simpleworkflow.flow.DynamicWorkflowClientExternalImpl;
+import com.amazonaws.services.simpleworkflow.flow.common.WorkflowExecutionUtils;
+import com.amazonaws.services.simpleworkflow.flow.worker.GenericWorkflowClientExternalImpl;
+import com.amazonaws.services.simpleworkflow.model.ChildPolicy;
+import com.amazonaws.services.simpleworkflow.model.DescribeWorkflowExecutionRequest;
+import com.amazonaws.services.simpleworkflow.model.HistoryEvent;
+import com.amazonaws.services.simpleworkflow.model.WorkflowExecution;
+import com.amazonaws.services.simpleworkflow.model.WorkflowExecutionDetail;
+import com.amazonaws.services.simpleworkflow.model.WorkflowExecutionInfo;
+import com.amazonaws.services.simpleworkflow.model.WorkflowType;
+
+public class CamelSWFWorkflowClient {
+ private final SWFEndpoint endpoint;
+ private final SWFConfiguration configuration;
+
+ public CamelSWFWorkflowClient(SWFEndpoint endpoint, SWFConfiguration configuration) {
+ this.endpoint = endpoint;
+ this.configuration = configuration;
+ }
+
+ public void signalWorkflowExecution(String workflowId, String runId, String signalName, Object arguments) {
+ DynamicWorkflowClientExternal dynamicWorkflowClientExternal = getDynamicWorkflowClient(workflowId, runId);
+ dynamicWorkflowClientExternal.signalWorkflowExecution(signalName, toArray(arguments));
+ }
+
+ public Object getWorkflowExecutionState(String workflowId, String runId, Class aClass) throws Throwable {
+ DynamicWorkflowClientExternal dynamicWorkflowClientExternal = getDynamicWorkflowClient(workflowId, runId);
+ return dynamicWorkflowClientExternal.getWorkflowExecutionState(aClass);
+ }
+
+ public void requestCancelWorkflowExecution(String workflowId, String runId) {
+ DynamicWorkflowClientExternal dynamicWorkflowClientExternal = getDynamicWorkflowClient(workflowId, runId);
+ dynamicWorkflowClientExternal.requestCancelWorkflowExecution();
+ }
+
+ public void terminateWorkflowExecution(String workflowId, String runId, String reason, String details, String childPolicy) {
+ DynamicWorkflowClientExternal dynamicWorkflowClientExternal = getDynamicWorkflowClient(workflowId, runId);
+ ChildPolicy policy = childPolicy != null ? ChildPolicy.valueOf(childPolicy) : null;
+ dynamicWorkflowClientExternal.terminateWorkflowExecution(reason, details, policy);
+ }
+
+ public String[] startWorkflowExecution(String workflowId, String runId, String eventName, String version, Object arguments) {
+ DynamicWorkflowClientExternalImpl dynamicWorkflowClientExternal = (DynamicWorkflowClientExternalImpl) getDynamicWorkflowClient(workflowId, runId);
+
+ WorkflowType workflowType = new WorkflowType();
+ workflowType.setName(eventName);
+ workflowType.setVersion(version);
+ dynamicWorkflowClientExternal.setWorkflowType(workflowType);
+ dynamicWorkflowClientExternal.startWorkflowExecution(toArray(arguments));
+
+ String newWorkflowId = dynamicWorkflowClientExternal.getWorkflowExecution().getWorkflowId();
+ String newRunId = dynamicWorkflowClientExternal.getWorkflowExecution().getRunId();
+
+ return new String[] {newWorkflowId, newRunId};
+ }
+
+ public Map<String, Object> describeWorkflowInstance(String workflowId, String runId) {
+ DescribeWorkflowExecutionRequest describeRequest = new DescribeWorkflowExecutionRequest();
+ describeRequest.setDomain(configuration.getDomainName());
+ describeRequest.setExecution(new WorkflowExecution().withWorkflowId(workflowId).withRunId(runId));
+ WorkflowExecutionDetail executionDetail = endpoint.getSWClient().describeWorkflowExecution(describeRequest);
+ WorkflowExecutionInfo instanceMetadata = executionDetail.getExecutionInfo();
+
+ Map<String, Object> info = new HashMap<String, Object>();
+ info.put("closeStatus", instanceMetadata.getCloseStatus());
+ info.put("closeTimestamp", instanceMetadata.getCloseTimestamp());
+ info.put("executionStatus", instanceMetadata.getExecutionStatus());
+ info.put("tagList", instanceMetadata.getTagList());
+ info.put("executionDetail", executionDetail);
+ return info;
+ }
+
+ public List<HistoryEvent> getWorkflowExecutionHistory(String workflowId, String runId) {
+ return WorkflowExecutionUtils.getHistory(endpoint.getSWClient(),
+ configuration.getDomainName(), new WorkflowExecution().withWorkflowId(workflowId).withRunId(runId));
+ }
+
+ DynamicWorkflowClientExternal getDynamicWorkflowClient(String workflowId, String runId) {
+ GenericWorkflowClientExternalImpl genericClient = new GenericWorkflowClientExternalImpl(endpoint.getSWClient(), configuration.getDomainName());
+ WorkflowExecution workflowExecution = new WorkflowExecution();
+ workflowExecution.setWorkflowId(workflowId != null ? workflowId : genericClient.generateUniqueId());
+ workflowExecution.setRunId(runId);
+ return new DynamicWorkflowClientExternalImpl(workflowExecution, null, endpoint.getStartWorkflowOptions(), null, genericClient);
+ }
+
+ private Object[] toArray(Object input) {
+ Object[] inputArray;
+ if (input instanceof Object[]) {
+ inputArray = (Object[])input;
+ } else {
+ inputArray = new Object[1];
+ inputArray[0] = input;
+ }
+ return inputArray;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1c1d3a88/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/CamelWorkflowDefinition.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/CamelWorkflowDefinition.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/CamelWorkflowDefinition.java
new file mode 100644
index 0000000..ba95fea
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/CamelWorkflowDefinition.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.swf;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.amazonaws.services.simpleworkflow.flow.DataConverter;
+import com.amazonaws.services.simpleworkflow.flow.DataConverterException;
+import com.amazonaws.services.simpleworkflow.flow.DecisionContext;
+import com.amazonaws.services.simpleworkflow.flow.DecisionContextProvider;
+import com.amazonaws.services.simpleworkflow.flow.DecisionContextProviderImpl;
+import com.amazonaws.services.simpleworkflow.flow.JsonDataConverter;
+import com.amazonaws.services.simpleworkflow.flow.WorkflowClock;
+import com.amazonaws.services.simpleworkflow.flow.WorkflowException;
+import com.amazonaws.services.simpleworkflow.flow.common.WorkflowExecutionUtils;
+import com.amazonaws.services.simpleworkflow.flow.core.Promise;
+import com.amazonaws.services.simpleworkflow.flow.core.Settable;
+import com.amazonaws.services.simpleworkflow.flow.core.TryCatchFinally;
+import com.amazonaws.services.simpleworkflow.flow.generic.WorkflowDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CamelWorkflowDefinition extends WorkflowDefinition {
+ private static final transient Logger LOGGER = LoggerFactory.getLogger(CamelWorkflowDefinition.class);
+
+ private SWFWorkflowConsumer swfWorkflowConsumer;
+ private DecisionContext decisionContext;
+ private JsonDataConverter dataConverter;
+
+ private final DecisionContextProvider contextProvider = new DecisionContextProviderImpl();
+ private final WorkflowClock workflowClock = contextProvider.getDecisionContext().getWorkflowClock();
+
+ public CamelWorkflowDefinition(SWFWorkflowConsumer swfWorkflowConsumer, DecisionContext decisionContext, JsonDataConverter dataConverter) {
+ this.swfWorkflowConsumer = swfWorkflowConsumer;
+ this.decisionContext = decisionContext;
+ this.dataConverter = dataConverter;
+ }
+
+ @Override
+ public Promise<String> execute(final String input) throws WorkflowException {
+ final Settable<String> result = new Settable<String>();
+ final AtomicReference<Promise> methodResult = new AtomicReference<Promise>();
+ new TryCatchFinally() {
+
+ @Override
+ protected void doTry() throws Throwable {
+ Object[] parameters = dataConverter.fromData(input, Object[].class);
+ long startTime = workflowClock.currentTimeMillis();
+ boolean replaying = contextProvider.getDecisionContext().getWorkflowClock().isReplaying();
+ LOGGER.debug("Processing workflow execute");
+
+ Object r = swfWorkflowConsumer.processWorkflow(parameters, startTime, replaying);
+ if (r instanceof Promise) {
+ methodResult.set((Promise) r);
+ } else if (r != null) {
+ methodResult.set(new Settable(r));
+ }
+ }
+
+ @Override
+ protected void doCatch(Throwable e) throws Throwable {
+ if (!(e instanceof CancellationException) || !decisionContext.getWorkflowContext().isCancelRequested()) {
+ throwWorkflowException(dataConverter, e);
+ }
+ }
+
+ @Override
+ protected void doFinally() throws Throwable {
+ Promise r = methodResult.get();
+ if (r == null || r.isReady()) {
+ Object workflowResult = r == null ? null : r.get();
+ String convertedResult = dataConverter.toData(workflowResult);
+ result.set(convertedResult);
+ }
+ }
+ };
+
+ return result;
+ }
+
+ @Override
+ public void signalRecieved(String signalName, String input) throws WorkflowException {
+ Object[] parameters = dataConverter.fromData(input, Object[].class);
+ try {
+ LOGGER.debug("Processing workflow signalRecieved");
+
+ swfWorkflowConsumer.signalRecieved(parameters);
+ } catch (Throwable e) {
+ throwWorkflowException(dataConverter, e);
+ throw new IllegalStateException("Unreacheable");
+ }
+ }
+
+ @Override
+ public String getWorkflowState() throws WorkflowException {
+ try {
+ LOGGER.debug("Processing workflow getWorkflowState");
+
+ Object result = swfWorkflowConsumer.getWorkflowState(null);
+ return dataConverter.toData(result);
+ } catch (Throwable e) {
+ throwWorkflowException(dataConverter, e);
+ throw new IllegalStateException("Unreachable");
+ }
+ }
+
+ private void throwWorkflowException(DataConverter c, Throwable exception) throws WorkflowException {
+ if (exception instanceof WorkflowException) {
+ throw (WorkflowException) exception;
+ }
+ String reason = WorkflowExecutionUtils.truncateReason(exception.getMessage());
+ String details = null;
+ try {
+ details = c.toData(exception);
+ } catch (DataConverterException dataConverterException) {
+ if (dataConverterException.getCause() == null) {
+ dataConverterException.initCause(exception);
+ }
+ throw dataConverterException;
+ }
+
+ throw new WorkflowException(reason, details);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1c1d3a88/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/CamelWorkflowDefinitionFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/CamelWorkflowDefinitionFactory.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/CamelWorkflowDefinitionFactory.java
new file mode 100644
index 0000000..84fe62f
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/CamelWorkflowDefinitionFactory.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.swf;
+
+import com.amazonaws.services.simpleworkflow.flow.DecisionContext;
+import com.amazonaws.services.simpleworkflow.flow.JsonDataConverter;
+import com.amazonaws.services.simpleworkflow.flow.WorkflowTypeRegistrationOptions;
+import com.amazonaws.services.simpleworkflow.flow.generic.WorkflowDefinition;
+import com.amazonaws.services.simpleworkflow.flow.generic.WorkflowDefinitionFactory;
+import com.amazonaws.services.simpleworkflow.flow.worker.CurrentDecisionContext;
+import com.amazonaws.services.simpleworkflow.model.WorkflowType;
+
+public class CamelWorkflowDefinitionFactory extends WorkflowDefinitionFactory {
+ private SWFWorkflowConsumer swfWorkflowConsumer;
+ private WorkflowType workflowType;
+ private WorkflowTypeRegistrationOptions registrationOptions;
+
+ public CamelWorkflowDefinitionFactory(SWFWorkflowConsumer swfWorkflowConsumer, WorkflowType workflowType, WorkflowTypeRegistrationOptions registrationOptions) {
+ this.swfWorkflowConsumer = swfWorkflowConsumer;
+ this.workflowType = workflowType;
+ this.registrationOptions = registrationOptions;
+ }
+
+ @Override
+ public WorkflowTypeRegistrationOptions getWorkflowRegistrationOptions() {
+ return registrationOptions;
+ }
+
+ @Override
+ public WorkflowDefinition getWorkflowDefinition(DecisionContext context) throws Exception {
+ CurrentDecisionContext.set(context);
+ return new CamelWorkflowDefinition(swfWorkflowConsumer, context, new JsonDataConverter());
+ }
+
+ @Override
+ public void deleteWorkflowDefinition(WorkflowDefinition instance) {
+ CurrentDecisionContext.unset();
+ }
+
+ @Override
+ public WorkflowType getWorkflowType() {
+ return workflowType;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1c1d3a88/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/CamelWorkflowDefinitionFactoryFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/CamelWorkflowDefinitionFactoryFactory.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/CamelWorkflowDefinitionFactoryFactory.java
new file mode 100644
index 0000000..d3cdecc
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/CamelWorkflowDefinitionFactoryFactory.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.swf;
+
+import java.util.ArrayList;
+
+import com.amazonaws.services.simpleworkflow.flow.WorkflowTypeRegistrationOptions;
+import com.amazonaws.services.simpleworkflow.flow.generic.WorkflowDefinitionFactory;
+import com.amazonaws.services.simpleworkflow.flow.generic.WorkflowDefinitionFactoryFactory;
+import com.amazonaws.services.simpleworkflow.model.WorkflowType;
+
+public class CamelWorkflowDefinitionFactoryFactory extends WorkflowDefinitionFactoryFactory {
+ private final SWFWorkflowConsumer swfWorkflowConsumer;
+ private final SWFConfiguration configuration;
+
+ public CamelWorkflowDefinitionFactoryFactory(SWFWorkflowConsumer swfWorkflowConsumer, SWFConfiguration configuration) {
+ this.swfWorkflowConsumer = swfWorkflowConsumer;
+ this.configuration = configuration;
+ }
+
+ @Override
+ public WorkflowDefinitionFactory getWorkflowDefinitionFactory(WorkflowType workflowType) {
+ WorkflowTypeRegistrationOptions registrationOptions = configuration.getWorkflowTypeRegistrationOptions() != null
+ ? configuration.getWorkflowTypeRegistrationOptions() : new WorkflowTypeRegistrationOptions();
+
+ return new CamelWorkflowDefinitionFactory(swfWorkflowConsumer, workflowType, registrationOptions);
+ }
+
+ @Override
+ public Iterable<WorkflowType> getWorkflowTypesToRegister() {
+ ArrayList<WorkflowType> workflowTypes = new ArrayList<WorkflowType>(1);
+ WorkflowType workflowType = new WorkflowType();
+ workflowType.setName(configuration.getEventName());
+ workflowType.setVersion(configuration.getVersion());
+ workflowTypes.add(workflowType);
+ return workflowTypes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1c1d3a88/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFActivityConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFActivityConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFActivityConsumer.java
new file mode 100644
index 0000000..0f47606
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFActivityConsumer.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.swf;
+
+import com.amazonaws.services.simpleworkflow.flow.worker.GenericActivityWorker;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SWFActivityConsumer extends DefaultConsumer {
+ private static final transient Logger LOGGER = LoggerFactory.getLogger(SWFWorkflowProducer.class);
+ private SWFEndpoint endpoint;
+ private final SWFConfiguration configuration;
+ private GenericActivityWorker genericWorker;
+
+ public SWFActivityConsumer(SWFEndpoint endpoint, Processor processor, SWFConfiguration configuration) {
+ super(endpoint, processor);
+ this.endpoint = endpoint;
+ this.configuration = configuration;
+ }
+
+ public Object processActivity(Object[] inputParameters, String taskToken) throws Exception {
+ LOGGER.debug("Processing activity task: " + inputParameters);
+
+ Exchange exchange = endpoint.createExchange(inputParameters, SWFConstants.EXECUTE_ACTION);
+ exchange.getIn().setHeader(SWFConstants.TASK_TOKEN, taskToken);
+
+ getProcessor().process(exchange);
+ return endpoint.getResult(exchange);
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ CamelActivityImplementationFactory factory = new CamelActivityImplementationFactory(this, configuration);
+ genericWorker = new GenericActivityWorker(endpoint.getSWClient(), configuration.getDomainName(), configuration.getActivityList());
+ genericWorker.setActivityImplementationFactory(factory);
+ genericWorker.start();
+ super.doStart();
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ genericWorker.setDisableServiceShutdownOnStop(true);
+ genericWorker.shutdownNow();
+ super.doStop();
+ }
+
+ @Override
+ public String toString() {
+ return "SWFActivityConsumer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1c1d3a88/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFActivityProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFActivityProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFActivityProducer.java
new file mode 100644
index 0000000..7b364ac
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFActivityProducer.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.swf;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SWFActivityProducer extends DefaultProducer {
+ private static final transient Logger LOGGER = LoggerFactory.getLogger(SWFActivityProducer.class);
+ private final CamelSWFActivityClient camelSWFClient;
+ private SWFEndpoint endpoint;
+ private SWFConfiguration configuration;
+
+ public SWFActivityProducer(SWFEndpoint endpoint, CamelSWFActivityClient camelSWFActivityClient) {
+ super(endpoint);
+ this.endpoint = endpoint;
+ this.configuration = endpoint.getConfiguration();
+ this.camelSWFClient = camelSWFActivityClient;
+ }
+
+ public void process(Exchange exchange) throws Exception {
+ String eventName = getEventName(exchange);
+ String version = getVersion(exchange);
+ LOGGER.debug("scheduleActivity : " + eventName + " : " + version);
+
+ Object result = camelSWFClient.scheduleActivity(eventName, version, exchange.getIn().getBody());
+ endpoint.setResult(exchange, result);
+ }
+
+ private String getEventName(Exchange exchange) {
+ String eventName = exchange.getIn().getHeader(SWFConstants.EVENT_NAME, String.class);
+ return eventName != null ? eventName : configuration.getEventName();
+ }
+
+ private String getVersion(Exchange exchange) {
+ String version = exchange.getIn().getHeader(SWFConstants.VERSION, String.class);
+ return version != null ? version : configuration.getVersion();
+ }
+
+ @Override
+ public String toString() {
+ return "SWFActivityProducer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1c1d3a88/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFComponent.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFComponent.java
new file mode 100644
index 0000000..538ebd7
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFComponent.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.swf;
+
+import java.util.Map;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.util.IntrospectionSupport;
+
+/**
+ * Defines the <a href="http://aws.amazon.com/swf/">Amazon Simple Workflow Component</a>
+ */
+public class SWFComponent extends DefaultComponent {
+
+ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+ Map<String, Object> clientConfigurationParameters = IntrospectionSupport.extractProperties(parameters, "clientConfiguration.");
+ Map<String, Object> sWClientParameters = IntrospectionSupport.extractProperties(parameters, "sWClient.");
+ Map<String, Object> startWorkflowOptionsParameters = IntrospectionSupport.extractProperties(parameters, "startWorkflowOptions.");
+
+ SWFConfiguration configuration = new SWFConfiguration();
+ configuration.setType(remaining);
+ setProperties(configuration, parameters);
+ configuration.setClientConfigurationParameters(clientConfigurationParameters);
+ configuration.setsWClientParameters(sWClientParameters);
+ configuration.setStartWorkflowOptionsParameters(startWorkflowOptionsParameters);
+
+ return new SWFEndpoint(uri, this, configuration);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1c1d3a88/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFConfiguration.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFConfiguration.java
new file mode 100644
index 0000000..5d9513f
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFConfiguration.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.swf;
+
+import java.util.Map;
+
+import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClient;
+import com.amazonaws.services.simpleworkflow.flow.ActivitySchedulingOptions;
+import com.amazonaws.services.simpleworkflow.flow.DataConverter;
+import com.amazonaws.services.simpleworkflow.flow.WorkflowTypeRegistrationOptions;
+import com.amazonaws.services.simpleworkflow.flow.worker.ActivityTypeExecutionOptions;
+import com.amazonaws.services.simpleworkflow.flow.worker.ActivityTypeRegistrationOptions;
+
+
+public class SWFConfiguration {
+ private String accessKey;
+ private String secretKey;
+ private AmazonSimpleWorkflowClient amazonSWClient;
+ private Map<String, Object> clientConfigurationParameters;
+ private Map<String, Object> sWClientParameters;
+ private Map<String, Object> startWorkflowOptionsParameters;
+ private String operation;
+ private String domainName;
+ private String activityList;
+ private String workflowList;
+ private String eventName;
+ private String version;
+ private String type;
+ private String signalName;
+ private String childPolicy;
+ private String terminationReason;
+ private String stateResultType;
+ private String terminationDetails;
+ private DataConverter dataConverter;
+ private ActivitySchedulingOptions activitySchedulingOptions;
+ private ActivityTypeExecutionOptions activityTypeExecutionOptions;
+ private ActivityTypeRegistrationOptions activityTypeRegistrationOptions;
+ private WorkflowTypeRegistrationOptions workflowTypeRegistrationOptions;
+
+ public String getAccessKey() {
+ return accessKey;
+ }
+
+ public void setAccessKey(String accessKey) {
+ this.accessKey = accessKey;
+ }
+
+ public String getSecretKey() {
+ return secretKey;
+ }
+
+ public void setSecretKey(String secretKey) {
+ this.secretKey = secretKey;
+ }
+
+ public String getDomainName() {
+ return domainName;
+ }
+
+ public void setDomainName(String domainName) {
+ this.domainName = domainName;
+ }
+
+ public String getActivityList() {
+ return activityList;
+ }
+
+ public void setActivityList(String activityList) {
+ this.activityList = activityList;
+ }
+
+ public String getWorkflowList() {
+ return workflowList;
+ }
+
+ public void setWorkflowList(String workflowList) {
+ this.workflowList = workflowList;
+ }
+
+ public String getEventName() {
+ return eventName;
+ }
+
+ public void setEventName(String eventName) {
+ this.eventName = eventName;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public void setVersion(String version) {
+ this.version = version;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public Map<String, Object> getClientConfigurationParameters() {
+ return clientConfigurationParameters;
+ }
+
+ public void setClientConfigurationParameters(Map<String, Object> clientConfigurationParameters) {
+ this.clientConfigurationParameters = clientConfigurationParameters;
+ }
+
+ public Map<String, Object> getsWClientParameters() {
+ return sWClientParameters;
+ }
+
+ public void setsWClientParameters(Map<String, Object> sWClientParameters) {
+ this.sWClientParameters = sWClientParameters;
+ }
+
+ public AmazonSimpleWorkflowClient getAmazonSWClient() {
+ return amazonSWClient;
+ }
+
+ public void setAmazonSWClient(AmazonSimpleWorkflowClient amazonSWClient) {
+ this.amazonSWClient = amazonSWClient;
+ }
+ public Map<String, Object> getStartWorkflowOptionsParameters() {
+ return startWorkflowOptionsParameters;
+ }
+
+ public void setStartWorkflowOptionsParameters(Map<String, Object> startWorkflowOptionsParameters) {
+ this.startWorkflowOptionsParameters = startWorkflowOptionsParameters;
+ }
+
+ public String getOperation() {
+ return operation;
+ }
+
+ public void setOperation(String operation) {
+ this.operation = operation;
+ }
+
+ public String getSignalName() {
+ return signalName;
+ }
+
+ public void setSignalName(String signalName) {
+ this.signalName = signalName;
+ }
+
+ public String getChildPolicy() {
+ return childPolicy;
+ }
+
+ public void setChildPolicy(String childPolicy) {
+ this.childPolicy = childPolicy;
+ }
+
+ public String getTerminationReason() {
+ return terminationReason;
+ }
+
+ public void setTerminationReason(String terminationReason) {
+ this.terminationReason = terminationReason;
+ }
+
+ public String getStateResultType() {
+ return stateResultType;
+ }
+
+ public void setStateResultType(String stateResultType) {
+ this.stateResultType = stateResultType;
+ }
+
+ public String getTerminationDetails() {
+ return terminationDetails;
+ }
+
+ public void setTerminationDetails(String terminationDetails) {
+ this.terminationDetails = terminationDetails;
+ }
+
+ public ActivityTypeExecutionOptions getActivityTypeExecutionOptions() {
+ return activityTypeExecutionOptions;
+ }
+
+ public void setActivityTypeExecutionOptions(ActivityTypeExecutionOptions activityTypeExecutionOptions) {
+ this.activityTypeExecutionOptions = activityTypeExecutionOptions;
+ }
+
+ public ActivityTypeRegistrationOptions getActivityTypeRegistrationOptions() {
+ return activityTypeRegistrationOptions;
+ }
+
+ public void setActivityTypeRegistrationOptions(ActivityTypeRegistrationOptions activityTypeRegistrationOptions) {
+ this.activityTypeRegistrationOptions = activityTypeRegistrationOptions;
+ }
+
+ public DataConverter getDataConverter() {
+ return dataConverter;
+ }
+
+ public void setDataConverter(DataConverter dataConverter) {
+ this.dataConverter = dataConverter;
+ }
+
+ public WorkflowTypeRegistrationOptions getWorkflowTypeRegistrationOptions() {
+ return workflowTypeRegistrationOptions;
+ }
+
+ public void setWorkflowTypeRegistrationOptions(WorkflowTypeRegistrationOptions workflowTypeRegistrationOptions) {
+ this.workflowTypeRegistrationOptions = workflowTypeRegistrationOptions;
+ }
+
+ public ActivitySchedulingOptions getActivitySchedulingOptions() {
+ return activitySchedulingOptions;
+ }
+
+ public void setActivitySchedulingOptions(ActivitySchedulingOptions activitySchedulingOptions) {
+ this.activitySchedulingOptions = activitySchedulingOptions;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1c1d3a88/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFConstants.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFConstants.java
new file mode 100644
index 0000000..6bb71cb
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFConstants.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.swf;
+
+public interface SWFConstants {
+ String OPERATION = "CamelSWFOperation";
+ String WORKFLOW_ID = "CamelSWFWorkflowId";
+ String RUN_ID = "CamelSWFRunId";
+ String STATE_RESULT_TYPE = "CamelSWFStateResultType";
+ String EVENT_NAME = "CamelSWFEventName";
+ String VERSION = "CamelSWFVersion";
+ String SIGNAL_NAME = "CamelSWFSignalName";
+ String CHILD_POLICY = "CamelSWFChildPolicy";
+ String DETAILS = "CamelSWFDetails";
+ String REASON = "CamelSWFReason";
+ String ACTION = "CamelSWFAction";
+ String EXECUTE_ACTION = "CamelSWFActionExecute";
+ String SIGNAL_RECEIVED_ACTION = "CamelSWFSignalReceivedAction";
+ String GET_STATE_ACTION = "CamelSWFGetStateAction";
+ String TASK_TOKEN = "CamelSWFTaskToken";
+ String WORKFLOW_START_TIME = "CamelSWFWorkflowStartTime";
+ String WORKFLOW_REPLAYING = "CamelSWFWorkflowReplaying";
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1c1d3a88/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFEndpoint.java
new file mode 100644
index 0000000..f50c7bd
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFEndpoint.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.swf;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClient;
+import com.amazonaws.services.simpleworkflow.flow.StartWorkflowOptions;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.util.EndpointHelper;
+import org.apache.camel.util.ExchangeHelper;
+
+/**
+ * Defines the <a href="http://aws.amazon.com/swf/">Amazon Simple Workflow Endpoint</a>
+ */
+public class SWFEndpoint extends DefaultEndpoint {
+ private SWFConfiguration configuration;
+ private AmazonSimpleWorkflowClient amazonSWClient;
+
+ public SWFEndpoint() {
+ }
+
+ public SWFEndpoint(String uri, SWFComponent component, SWFConfiguration configuration) {
+ super(uri, component);
+ this.configuration = configuration;
+ }
+
+ public Producer createProducer() throws Exception {
+ return isWorkflow()
+ ? new SWFWorkflowProducer(this, new CamelSWFWorkflowClient(this, configuration)) : new SWFActivityProducer(this, new CamelSWFActivityClient(configuration));
+ }
+
+ public Consumer createConsumer(Processor processor) throws Exception {
+ return isWorkflow()
+ ? new SWFWorkflowConsumer(this, processor, configuration) : new SWFActivityConsumer(this, processor, configuration);
+ }
+
+ public boolean isSingleton() {
+ return true;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ if (configuration.getAmazonSWClient() == null) {
+ amazonSWClient = createSWClient();
+ }
+ super.doStart();
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (amazonSWClient != null) {
+ amazonSWClient.shutdown();
+ amazonSWClient = null;
+ }
+ super.doStop();
+ }
+
+ public AmazonSimpleWorkflowClient getSWClient() {
+ return configuration.getAmazonSWClient() != null ? configuration.getAmazonSWClient() : amazonSWClient;
+ }
+
+ private AmazonSimpleWorkflowClient createSWClient() throws Exception {
+ AWSCredentials credentials = new BasicAWSCredentials(configuration.getAccessKey(), configuration.getSecretKey());
+
+ ClientConfiguration clientConfiguration = new ClientConfiguration();
+ if (!configuration.getClientConfigurationParameters().isEmpty()) {
+ setProperties(clientConfiguration, configuration.getClientConfigurationParameters());
+ }
+
+ AmazonSimpleWorkflowClient client = new AmazonSimpleWorkflowClient(credentials, clientConfiguration);
+ if (!configuration.getsWClientParameters().isEmpty()) {
+ setProperties(client, configuration.getsWClientParameters());
+ }
+ return client;
+ }
+
+ public StartWorkflowOptions getStartWorkflowOptions() {
+ StartWorkflowOptions startWorkflowOptions = new StartWorkflowOptions();
+ try {
+ EndpointHelper.setReferenceProperties(getCamelContext(), startWorkflowOptions, configuration.getStartWorkflowOptionsParameters());
+ EndpointHelper.setProperties(getCamelContext(), startWorkflowOptions, configuration.getStartWorkflowOptionsParameters());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return startWorkflowOptions;
+ }
+
+ private boolean isWorkflow() {
+ return configuration.getType().equalsIgnoreCase("workflow");
+ }
+
+ public Exchange createExchange(Object request, String action) {
+ Exchange exchange = createExchange(ExchangePattern.InOut);
+ exchange.getIn().setBody(request);
+ exchange.getIn().setHeader(SWFConstants.ACTION, action);
+ return exchange;
+ }
+
+ public Object getResult(Exchange exchange) {
+ return ExchangeHelper.isOutCapable(exchange) ? exchange.getOut().getBody() : exchange.getIn().getBody();
+ }
+
+ public void setResult(Exchange exchange, Object result) {
+ if (ExchangeHelper.isOutCapable(exchange)) {
+ exchange.getOut().setBody(result);
+ } else {
+ exchange.getIn().setBody(result);
+ }
+ }
+
+ public void setConfiguration(SWFConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ public SWFConfiguration getConfiguration() {
+ return configuration;
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/camel/blob/1c1d3a88/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFWorkflowConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFWorkflowConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFWorkflowConsumer.java
new file mode 100644
index 0000000..842917a
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFWorkflowConsumer.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.swf;
+
+import com.amazonaws.services.simpleworkflow.flow.worker.GenericWorkflowWorker;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.ExchangePattern.InOnly;
+
+public class SWFWorkflowConsumer extends DefaultConsumer {
+ private static final transient Logger LOGGER = LoggerFactory.getLogger(SWFWorkflowProducer.class);
+ private SWFEndpoint endpoint;
+ private final SWFConfiguration configuration;
+ private GenericWorkflowWorker genericWorker;
+
+ public SWFWorkflowConsumer(SWFEndpoint endpoint, Processor processor, SWFConfiguration configuration) {
+ super(endpoint, processor);
+ this.endpoint = endpoint;
+ this.configuration = configuration;
+ }
+
+ public Object processWorkflow(Object[] parameters, long startTime, boolean replaying) throws Exception {
+ LOGGER.debug("Processing workflow task: " + parameters);
+ Exchange exchange = endpoint.createExchange(parameters, SWFConstants.EXECUTE_ACTION);
+ exchange.getIn().setHeader(SWFConstants.WORKFLOW_START_TIME, startTime);
+ exchange.getIn().setHeader(SWFConstants.WORKFLOW_REPLAYING, replaying);
+
+ getProcessor().process(exchange);
+ return endpoint.getResult(exchange);
+ }
+
+ public void signalRecieved(Object[] parameters) throws Exception {
+ LOGGER.debug("signalRecieved: " + parameters);
+
+ Exchange exchange = endpoint.createExchange(parameters, SWFConstants.SIGNAL_RECEIVED_ACTION);
+ exchange.setPattern(InOnly);
+ getProcessor().process(exchange);
+ }
+
+ public Object getWorkflowState(Object parameters) throws Exception {
+ LOGGER.debug("getWorkflowState: " + parameters);
+
+ Exchange exchange = endpoint.createExchange(parameters, SWFConstants.GET_STATE_ACTION);
+ getProcessor().process(exchange);
+ return endpoint.getResult(exchange);
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ CamelWorkflowDefinitionFactoryFactory factoryFactory = new CamelWorkflowDefinitionFactoryFactory(this, configuration);
+ genericWorker = new GenericWorkflowWorker(endpoint.getSWClient(), configuration.getDomainName(), configuration.getWorkflowList());
+ genericWorker.setWorkflowDefinitionFactoryFactory(factoryFactory);
+ genericWorker.start();
+ super.doStart();
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ genericWorker.setDisableServiceShutdownOnStop(true);
+ genericWorker.shutdownNow();
+ super.doStop();
+ }
+
+ @Override
+ public String toString() {
+ return "SWFWorkflowConsumer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1c1d3a88/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFWorkflowProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFWorkflowProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFWorkflowProducer.java
new file mode 100644
index 0000000..aa6913b
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/swf/SWFWorkflowProducer.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.swf;
+
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SWFWorkflowProducer extends DefaultProducer {
+ private static final transient Logger LOGGER = LoggerFactory.getLogger(SWFWorkflowProducer.class);
+ private final CamelSWFWorkflowClient camelSWFClient;
+ private SWFEndpoint endpoint;
+ private SWFConfiguration configuration;
+
+ public SWFWorkflowProducer(SWFEndpoint endpoint, CamelSWFWorkflowClient camelSWFClient) {
+ super(endpoint);
+ this.endpoint = endpoint;
+ this.configuration = endpoint.getConfiguration();
+ this.camelSWFClient = camelSWFClient;
+ }
+
+ public void process(Exchange exchange) throws Exception {
+ LOGGER.debug("processing workflow task " + exchange);
+
+ try {
+
+ Operation operation = getOperation(exchange);
+ switch (operation) {
+
+ case CANCEL:
+ camelSWFClient.requestCancelWorkflowExecution(getWorkflowId(exchange), getRunId(exchange));
+ break;
+
+ case GET_STATE:
+ Object state = camelSWFClient.getWorkflowExecutionState(getWorkflowId(exchange), getRunId(exchange), getResultType(exchange));
+ endpoint.setResult(exchange, state);
+ break;
+
+ case DESCRIBE:
+ Map<String, Object> workflowInfo = camelSWFClient.describeWorkflowInstance(getWorkflowId(exchange), getRunId(exchange));
+ endpoint.setResult(exchange, workflowInfo);
+ break;
+
+ case GET_HISTORY:
+ Object history = camelSWFClient.getWorkflowExecutionHistory(getWorkflowId(exchange), getRunId(exchange));
+ endpoint.setResult(exchange, history);
+ break;
+
+ case START:
+ String[] ids = camelSWFClient.startWorkflowExecution(getWorkflowId(exchange), getRunId(exchange),
+ getEventName(exchange), getVersion(exchange), getArguments(exchange));
+ setHeader(exchange, SWFConstants.WORKFLOW_ID, ids[0]);
+ setHeader(exchange, SWFConstants.RUN_ID, ids[1]);
+ break;
+
+ case SIGNAL:
+ camelSWFClient.signalWorkflowExecution(getWorkflowId(exchange), getRunId(exchange), getSignalName(exchange), getArguments(exchange));
+ break;
+
+ case TERMINATE:
+ camelSWFClient.terminateWorkflowExecution(getWorkflowId(exchange), getRunId(exchange), getReason(exchange), getDetails(exchange), getChildPolicy(exchange));
+ break;
+
+ default:
+ throw new UnsupportedOperationException(operation.toString());
+ }
+
+ } catch (Throwable throwable) {
+ throw new Exception(throwable);
+ }
+ }
+
+ private String getEventName(Exchange exchange) {
+ String eventName = exchange.getIn().getHeader(SWFConstants.EVENT_NAME, String.class);
+ return eventName != null ? eventName : configuration.getEventName();
+ }
+
+ private String getVersion(Exchange exchange) {
+ String version = exchange.getIn().getHeader(SWFConstants.VERSION, String.class);
+ return version != null ? version : configuration.getVersion();
+ }
+
+ private String getSignalName(Exchange exchange) {
+ String signalName = exchange.getIn().getHeader(SWFConstants.SIGNAL_NAME, String.class);
+ return signalName != null ? signalName : configuration.getSignalName();
+ }
+
+ private String getChildPolicy(Exchange exchange) {
+ String childPolicy = exchange.getIn().getHeader(SWFConstants.CHILD_POLICY, String.class);
+ return childPolicy != null ? childPolicy : configuration.getChildPolicy();
+ }
+
+ private String getDetails(Exchange exchange) {
+ String details = exchange.getIn().getHeader(SWFConstants.DETAILS, String.class);
+ return details != null ? details : configuration.getTerminationDetails();
+ }
+
+ private String getReason(Exchange exchange) {
+ String reason = exchange.getIn().getHeader(SWFConstants.REASON, String.class);
+ return reason != null ? reason : configuration.getTerminationReason();
+ }
+
+ private String getWorkflowId(Exchange exchange) {
+ return exchange.getIn().getHeader(SWFConstants.WORKFLOW_ID, String.class);
+ }
+
+ private String getRunId(Exchange exchange) {
+ return exchange.getIn().getHeader(SWFConstants.RUN_ID, String.class);
+ }
+
+ private Class getResultType(Exchange exchange) throws ClassNotFoundException {
+ String type = exchange.getIn().getHeader(SWFConstants.STATE_RESULT_TYPE, String.class);
+ if (type == null) {
+ type = configuration.getStateResultType();
+ }
+
+ return type != null ? Class.forName(type) : Object.class;
+ }
+
+ private Operation getOperation(Exchange exchange) {
+ String operation = exchange.getIn().getHeader(SWFConstants.OPERATION, String.class);
+ if (operation == null) {
+ operation = configuration.getOperation();
+ }
+
+ return operation != null ? Operation.valueOf(operation) : Operation.START;
+ }
+
+ private void setHeader(Exchange exchange, String key, Object value) {
+ if (ExchangeHelper.isOutCapable(exchange)) {
+ exchange.getOut().setHeader(key, value);
+ } else {
+ exchange.getIn().setHeader(key, value);
+ }
+ }
+
+ private Object getArguments(Exchange exchange) {
+ return exchange.getIn().getBody();
+ }
+
+
+ @Override
+ public String toString() {
+ return "SWFWorkflowProducer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]";
+ }
+
+ private enum Operation {
+ SIGNAL, CANCEL, TERMINATE, GET_STATE, START, DESCRIBE, GET_HISTORY;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1c1d3a88/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-swf
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-swf b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-swf
new file mode 100644
index 0000000..5e33c5b
--- /dev/null
+++ b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-swf
@@ -0,0 +1 @@
+class=org.apache.camel.component.aws.swf.SWFComponent
http://git-wip-us.apache.org/repos/asf/camel/blob/1c1d3a88/components/camel-aws/src/test/java/org/apache/camel/component/aws/swf/AmazonSWFClientMock.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/swf/AmazonSWFClientMock.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/swf/AmazonSWFClientMock.java
new file mode 100644
index 0000000..172adcf
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/swf/AmazonSWFClientMock.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.aws.swf;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClient;
+import com.amazonaws.services.simpleworkflow.model.Run;
+import com.amazonaws.services.simpleworkflow.model.StartWorkflowExecutionRequest;
+
+public class AmazonSWFClientMock extends AmazonSimpleWorkflowClient {
+
+ @Override
+ public Run startWorkflowExecution(StartWorkflowExecutionRequest startWorkflowExecutionRequest) throws AmazonServiceException, AmazonClientException {
+ return new Run().withRunId("run1");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/1c1d3a88/components/camel-aws/src/test/java/org/apache/camel/component/aws/swf/CamelSWFActivityClientTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/swf/CamelSWFActivityClientTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/swf/CamelSWFActivityClientTest.java
new file mode 100644
index 0000000..c7883cc
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/swf/CamelSWFActivityClientTest.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.swf;
+
+import com.amazonaws.services.simpleworkflow.flow.ActivitySchedulingOptions;
+import com.amazonaws.services.simpleworkflow.flow.DynamicActivitiesClient;
+import com.amazonaws.services.simpleworkflow.flow.core.Promise;
+import com.amazonaws.services.simpleworkflow.model.ActivityType;
+import org.junit.Before;
+import org.junit.Test;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class CamelSWFActivityClientTest {
+
+ private DynamicActivitiesClient activitiesClient;
+ private CamelSWFActivityClient camelSWFActivityClient;
+
+ @Before
+ public void setUp() throws Exception {
+ activitiesClient = mock(DynamicActivitiesClient.class);
+ camelSWFActivityClient = new CamelSWFActivityClient(new SWFConfiguration()) {
+ @Override
+ DynamicActivitiesClient getDynamicActivitiesClient() {
+ return activitiesClient;
+ }
+ };
+ }
+
+ @Test
+ public void testScheduleActivity() throws Exception {
+ Object result = camelSWFActivityClient.scheduleActivity("eventName", "version", "input");
+ verify(activitiesClient).scheduleActivity(any(ActivityType.class), any(Promise[].class), isNull(ActivitySchedulingOptions.class), any(Class.class), isNull(Promise.class));
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1c1d3a88/components/camel-aws/src/test/java/org/apache/camel/component/aws/swf/CamelSWFActivityConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/swf/CamelSWFActivityConsumerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/swf/CamelSWFActivityConsumerTest.java
new file mode 100644
index 0000000..33e8a47
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/swf/CamelSWFActivityConsumerTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.swf;
+
+import com.amazonaws.services.simpleworkflow.model.ActivityTask;
+import com.amazonaws.services.simpleworkflow.model.PollForActivityTaskRequest;
+import org.apache.camel.Exchange;
+import org.apache.camel.Predicate;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class CamelSWFActivityConsumerTest extends CamelSWFTestSupport {
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+
+ public void configure() throws Exception {
+ from("aws-swf://activity?" + options)
+ .to("mock:result");
+ }
+ };
+ }
+
+ @Override
+ public boolean isUseAdviceWith() {
+ return true;
+ }
+
+ @Test
+ public void receivesDecisionTask() throws Exception {
+ result.expectedMinimumMessageCount(1);
+ result.expectedMessagesMatches(new Predicate() {
+ public boolean matches(Exchange exchange) {
+ return exchange.getIn().getHeader(SWFConstants.ACTION).equals(SWFConstants.EXECUTE_ACTION)
+ && exchange.getIn().getBody(Object[].class)[0].equals("test");
+ }
+ });
+
+ ActivityTask activityTask = new ActivityTask();
+ activityTask.setTaskToken("token");
+ activityTask.setInput("[\"[Ljava.lang.Object;\",[\"test\"]]");
+ when(amazonSWClient.pollForActivityTask(any(PollForActivityTaskRequest.class))).thenReturn(activityTask);
+
+ context.start();
+
+ assertMockEndpointsSatisfied();
+ verify(amazonSWClient, atLeastOnce()).pollForActivityTask(any(PollForActivityTaskRequest.class));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/1c1d3a88/components/camel-aws/src/test/java/org/apache/camel/component/aws/swf/CamelSWFTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/swf/CamelSWFTestSupport.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/swf/CamelSWFTestSupport.java
new file mode 100644
index 0000000..f7ad1c0
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/swf/CamelSWFTestSupport.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.swf;
+
+import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClient;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import static org.mockito.Mockito.mock;
+
+public class CamelSWFTestSupport extends CamelTestSupport {
+
+ protected String options =
+ "accessKey=key"
+ + "&secretKey=secret"
+ + "&domainName=testDomain"
+ + "&activityList=swf-alist"
+ + "&workflowList=swf-wlist"
+ + "&version=1.0"
+ + "&eventName=testEvent"
+ + "&amazonSWClient=#amazonSWClient";
+
+ @EndpointInject(uri = "direct:start")
+ protected ProducerTemplate template;
+
+ @EndpointInject(uri = "mock:result")
+ protected MockEndpoint result;
+
+ protected AmazonSimpleWorkflowClient amazonSWClient;
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry registry = super.createRegistry();
+ amazonSWClient = mock(AmazonSimpleWorkflowClient.class);
+ registry.bind("amazonSWClient", amazonSWClient);
+ return registry;
+ }
+}
\ No newline at end of file