You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/06/05 17:12:51 UTC
[07/11] CAMEL-6428: camel-salesforce component. Thanks to Dhiraj
Bokde for the contribution.
http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/NotifyForOperationsEnum.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/NotifyForOperationsEnum.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/NotifyForOperationsEnum.java
new file mode 100644
index 0000000..807fef5
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/NotifyForOperationsEnum.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.dto;
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonValue;
+
+/**
+ * Salesforce Enumeration DTO for picklist NotifyForOperations
+ */
+public enum NotifyForOperationsEnum {
+
+ CREATE("Create"),
+ UPDATE("Update"),
+ ALL("All");
+
+ final String value;
+
+ private NotifyForOperationsEnum(String value) {
+ this.value = value;
+ }
+
+ @JsonValue
+ public String value() {
+ return this.value;
+ }
+
+ @JsonCreator
+ public static NotifyForOperationsEnum forValue(String value) {
+ for (NotifyForOperationsEnum e : NotifyForOperationsEnum.values()) {
+ if (e.value.equals(value)) {
+ return e;
+ }
+ }
+ throw new IllegalArgumentException(value);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/PushTopic.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/PushTopic.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/PushTopic.java
new file mode 100644
index 0000000..2135a16
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/PushTopic.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.dto;
+
+import com.thoughtworks.xstream.annotations.XStreamAlias;
+import com.thoughtworks.xstream.annotations.XStreamConverter;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.apache.camel.component.salesforce.api.PicklistEnumConverter;
+import org.apache.camel.component.salesforce.api.dto.AbstractSObjectBase;
+
+/**
+ * Salesforce DTO for SObject PushTopic
+ */
+@XStreamAlias("PushTopic")
+public class PushTopic extends AbstractSObjectBase {
+
+ private String Query;
+ private Double ApiVersion;
+ private Boolean IsActive;
+ @XStreamConverter(PicklistEnumConverter.class)
+ private NotifyForFieldsEnum NotifyForFields;
+ @XStreamConverter(PicklistEnumConverter.class)
+ private NotifyForOperationsEnum NotifyForOperations;
+ private String Description;
+
+ @JsonProperty("Query")
+ public String getQuery() {
+ return this.Query;
+ }
+
+ @JsonProperty("Query")
+ public void setQuery(String Query) {
+ this.Query = Query;
+ }
+
+ @JsonProperty("ApiVersion")
+ public Double getApiVersion() {
+ return this.ApiVersion;
+ }
+
+ @JsonProperty("ApiVersion")
+ public void setApiVersion(Double ApiVersion) {
+ this.ApiVersion = ApiVersion;
+ }
+
+ @JsonProperty("IsActive")
+ public Boolean getIsActive() {
+ return this.IsActive;
+ }
+
+ @JsonProperty("IsActive")
+ public void setIsActive(Boolean IsActive) {
+ this.IsActive = IsActive;
+ }
+
+ @JsonProperty("NotifyForFields")
+ public NotifyForFieldsEnum getNotifyForFields() {
+ return this.NotifyForFields;
+ }
+
+ @JsonProperty("NotifyForFields")
+ public void setNotifyForFields(NotifyForFieldsEnum NotifyForFields) {
+ this.NotifyForFields = NotifyForFields;
+ }
+
+ @JsonProperty("NotifyForOperations")
+ public NotifyForOperationsEnum getNotifyForOperations() {
+ return this.NotifyForOperations;
+ }
+
+ @JsonProperty("NotifyForOperations")
+ public void setNotifyForOperations(NotifyForOperationsEnum NotifyForOperations) {
+ this.NotifyForOperations = NotifyForOperations;
+ }
+
+ @JsonProperty("Description")
+ public String getDescription() {
+ return this.Description;
+ }
+
+ @JsonProperty("Description")
+ public void setDescription(String Description) {
+ this.Description = Description;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/QueryRecordsPushTopic.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/QueryRecordsPushTopic.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/QueryRecordsPushTopic.java
new file mode 100644
index 0000000..4adc13c
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/QueryRecordsPushTopic.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.dto;
+
+import com.thoughtworks.xstream.annotations.XStreamImplicit;
+import org.apache.camel.component.salesforce.api.dto.AbstractQueryRecordsBase;
+
+import java.util.List;
+
+/**
+ * Salesforce Query Records DTO for PushTopic
+ */
+public class QueryRecordsPushTopic extends AbstractQueryRecordsBase {
+ @XStreamImplicit
+ private List<PushTopic> records;
+
+ public List<PushTopic> getRecords() {
+ return records;
+ }
+
+ public void setRecords(List<PushTopic> records) {
+ this.records = records;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/RestErrors.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/RestErrors.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/RestErrors.java
new file mode 100644
index 0000000..caf59fe
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/RestErrors.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.dto;
+
+import com.thoughtworks.xstream.annotations.XStreamAlias;
+import com.thoughtworks.xstream.annotations.XStreamImplicit;
+import org.apache.camel.component.salesforce.api.dto.RestError;
+
+import java.util.List;
+
+/**
+ * DTO for Salesforce REST errors
+ */
+@XStreamAlias("Errors")
+public class RestErrors {
+
+ @XStreamImplicit(itemFieldName = "Error")
+ private List<RestError> errors;
+
+ public List<RestError> getErrors() {
+ return errors;
+ }
+
+ public void setErrors(List<RestError> errors) {
+ this.errors = errors;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractRestProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractRestProcessor.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractRestProcessor.java
new file mode 100644
index 0000000..1e40d18
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractRestProcessor.java
@@ -0,0 +1,538 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.util.ServiceHelper;
+import org.apache.camel.component.salesforce.SalesforceEndpoint;
+import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.component.salesforce.api.dto.AbstractSObjectBase;
+import org.apache.camel.component.salesforce.internal.PayloadFormat;
+import org.apache.camel.component.salesforce.internal.client.DefaultRestClient;
+import org.apache.camel.component.salesforce.internal.client.RestClient;
+
+import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Map;
+
+import static org.apache.camel.component.salesforce.SalesforceEndpointConfig.*;
+
+public abstract class AbstractRestProcessor extends AbstractSalesforceProcessor {
+
+ protected static final String RESPONSE_CLASS = AbstractRestProcessor.class.getName() + ".responseClass";
+
+ private RestClient restClient;
+ private Map<String, Class<?>> classMap;
+
+ public AbstractRestProcessor(SalesforceEndpoint endpoint) throws SalesforceException {
+ super(endpoint);
+
+ final PayloadFormat payloadFormat = endpoint.getConfiguration().getPayloadFormat();
+
+ this.restClient = new DefaultRestClient(httpClient, endpointConfigMap.get(API_VERSION),
+ payloadFormat.toString().toLowerCase() , session);
+
+ this.classMap = endpoint.getComponent().getClassMap();
+ }
+
+ @Override
+ public void start() throws Exception {
+ ServiceHelper.startService(restClient);
+ }
+
+ @Override
+ public void stop() throws Exception {
+ ServiceHelper.stopService(restClient);
+ }
+
+ @Override
+ public final boolean process(final Exchange exchange, final AsyncCallback callback) {
+
+ // pre-process request message
+ try {
+ processRequest(exchange);
+ } catch (SalesforceException e) {
+ exchange.setException(e);
+ callback.done(true);
+ return true;
+ } catch (RuntimeException e) {
+ exchange.setException(new SalesforceException(e.getMessage(), e));
+ callback.done(true);
+ return true;
+ }
+
+ // call Salesforce asynchronously
+ try {
+
+ // call Operation using REST client
+ switch (operationName) {
+ case GET_VERSIONS:
+ restClient.getVersions(new RestClient.ResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException exception) {
+ // process response entity and create out message
+ processResponse(exchange, response, exception, callback);
+ }
+ });
+ break;
+
+ case GET_RESOURCES:
+ restClient.getResources(new RestClient.ResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException exception) {
+ processResponse(exchange, response, exception, callback);
+ }
+ });
+ break;
+
+ case GET_GLOBAL_OBJECTS:
+ restClient.getGlobalObjects(new RestClient.ResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException exception) {
+ processResponse(exchange, response, exception, callback);
+ }
+ });
+ break;
+
+ case GET_BASIC_INFO:
+ String sObjectName = getParameter(SOBJECT_NAME, exchange, USE_BODY, NOT_OPTIONAL);
+ restClient.getBasicInfo(sObjectName, new RestClient.ResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException exception) {
+ processResponse(exchange, response, exception, callback);
+ }
+ });
+ break;
+
+ case GET_DESCRIPTION:
+ sObjectName = getParameter(SOBJECT_NAME, exchange, USE_BODY, NOT_OPTIONAL);
+ restClient.getDescription(sObjectName, new RestClient.ResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException exception) {
+ processResponse(exchange, response, exception, callback);
+ }
+ });
+ break;
+
+ case GET_SOBJECT:
+ {
+ String sObjectIdValue;
+ // determine parameters from input AbstractSObject
+ final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+ if (sObjectBase != null) {
+ sObjectName = sObjectBase.getClass().getSimpleName();
+ sObjectIdValue = sObjectBase.getId();
+ } else {
+ sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ sObjectIdValue = getParameter(SOBJECT_ID, exchange, USE_BODY, NOT_OPTIONAL);
+ }
+ final String sObjectId = sObjectIdValue;
+
+ // use sObject name to load class
+ setResponseClass(exchange, sObjectName);
+
+ // get optional field list
+ String fieldsValue = getParameter(SOBJECT_FIELDS, exchange, IGNORE_BODY, IS_OPTIONAL);
+ String[] fields = null;
+ if (fieldsValue != null) {
+ fields = fieldsValue.split(",");
+ }
+
+ restClient.getSObject(sObjectName, sObjectId, fields, new RestClient.ResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException exception) {
+ processResponse(exchange, response, exception, callback);
+ restoreFields(exchange, sObjectBase, sObjectId, null, null);
+ }
+ });
+
+ break;
+ }
+
+ case CREATE_SOBJECT:
+ {
+ // determine parameters from input AbstractSObject
+ AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+ if (sObjectBase != null) {
+ sObjectName = sObjectBase.getClass().getSimpleName();
+ } else {
+ sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ }
+
+ restClient.createSObject(sObjectName, getRequestStream(exchange),
+ new RestClient.ResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException exception) {
+ processResponse(exchange, response, exception, callback);
+ }
+ });
+
+ break;
+ }
+
+ case UPDATE_SOBJECT:
+ {
+ // determine parameters from input AbstractSObject
+ final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+ String sObjectId;
+ if (sObjectBase != null) {
+ sObjectName = sObjectBase.getClass().getSimpleName();
+ // remember the sObject Id
+ sObjectId = sObjectBase.getId();
+ // clear base object fields, which cannot be updated
+ sObjectBase.clearBaseFields();
+ } else {
+ sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ sObjectId = getParameter(SOBJECT_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ }
+
+ final String finalsObjectId = sObjectId;
+ restClient.updateSObject(sObjectName, sObjectId, getRequestStream(exchange),
+ new RestClient.ResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException exception) {
+ processResponse(exchange, response, exception, callback);
+ restoreFields(exchange, sObjectBase, finalsObjectId, null, null);
+ }
+ });
+
+ break;
+ }
+
+ case DELETE_SOBJECT:
+ {
+ // determine parameters from input AbstractSObject
+ final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+ String sObjectIdValue;
+ if (sObjectBase != null) {
+ sObjectName = sObjectBase.getClass().getSimpleName();
+ sObjectIdValue = sObjectBase.getId();
+ } else {
+ sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ sObjectIdValue = getParameter(SOBJECT_ID, exchange, USE_BODY, NOT_OPTIONAL);
+ }
+ final String sObjectId = sObjectIdValue;
+
+ restClient.deleteSObject(sObjectName, sObjectId, new RestClient.ResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException exception) {
+ processResponse(exchange, response, exception, callback);
+ restoreFields(exchange, sObjectBase, sObjectId, null, null);
+ }
+ });
+ break;
+ }
+
+ case GET_SOBJECT_WITH_ID:
+ {
+ Object oldValue = null;
+ String sObjectExtIdValue;
+ final String sObjectExtIdName = getParameter(SOBJECT_EXT_ID_NAME,
+ exchange, IGNORE_BODY, NOT_OPTIONAL);
+
+ // determine parameters from input AbstractSObject
+ final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+ if (sObjectBase != null) {
+ sObjectName = sObjectBase.getClass().getSimpleName();
+ oldValue = getAndClearPropertyValue(sObjectBase, sObjectExtIdName);
+ sObjectExtIdValue = oldValue.toString();
+ } else {
+ sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ sObjectExtIdValue = getParameter(SOBJECT_EXT_ID_VALUE, exchange, USE_BODY, NOT_OPTIONAL);
+ }
+
+ // use sObject name to load class
+ setResponseClass(exchange, sObjectName);
+
+ final Object finalOldValue = oldValue;
+ restClient.getSObjectWithId(sObjectName, sObjectExtIdName, sObjectExtIdValue,
+ new RestClient.ResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException exception) {
+ processResponse(exchange, response, exception, callback);
+ restoreFields(exchange, sObjectBase, null, sObjectExtIdName, finalOldValue);
+ }
+ });
+
+ break;
+ }
+
+ case UPSERT_SOBJECT:
+ {
+ String sObjectExtIdValue;
+ final String sObjectExtIdName = getParameter(SOBJECT_EXT_ID_NAME, exchange,
+ IGNORE_BODY, NOT_OPTIONAL);
+
+ // determine parameters from input AbstractSObject
+ Object oldValue = null;
+ final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+ if (sObjectBase != null) {
+ sObjectName = sObjectBase.getClass().getSimpleName();
+ oldValue = getAndClearPropertyValue(sObjectBase, sObjectExtIdName);
+ sObjectExtIdValue = oldValue.toString();
+ // clear base object fields, which cannot be updated
+ sObjectBase.clearBaseFields();
+ } else {
+ sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ sObjectExtIdValue = getParameter(SOBJECT_EXT_ID_VALUE, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ }
+
+ final Object finalOldValue = oldValue;
+ restClient.upsertSObject(sObjectName, sObjectExtIdName, sObjectExtIdValue,
+ getRequestStream(exchange), new RestClient.ResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException exception) {
+ processResponse(exchange, response, exception, callback);
+ restoreFields(exchange, sObjectBase, null, sObjectExtIdName, finalOldValue);
+ }
+ });
+
+ break;
+ }
+
+ case DELETE_SOBJECT_WITH_ID:
+ {
+ final String sObjectExtIdName = getParameter(SOBJECT_EXT_ID_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+
+ // determine parameters from input AbstractSObject
+ Object oldValue = null;
+ final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+ String sObjectExtIdValue;
+ if (sObjectBase != null) {
+ sObjectName = sObjectBase.getClass().getSimpleName();
+ oldValue = getAndClearPropertyValue(sObjectBase, sObjectExtIdName);
+ sObjectExtIdValue = oldValue.toString();
+ } else {
+ sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ sObjectExtIdValue = getParameter(SOBJECT_EXT_ID_VALUE, exchange, USE_BODY, NOT_OPTIONAL);
+ }
+
+ final Object finalOldValue = oldValue;
+ restClient.deleteSObjectWithId(sObjectName, sObjectExtIdName, sObjectExtIdValue,
+ new RestClient.ResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException exception) {
+ processResponse(exchange, response, exception, callback);
+ restoreFields(exchange, sObjectBase, null, sObjectExtIdName, finalOldValue);
+ }
+ });
+
+ break;
+ }
+
+ case GET_BLOB_FIELD:
+ {
+ // get blob field name
+ final String sObjectBlobFieldName = getParameter(SOBJECT_BLOB_FIELD_NAME,
+ exchange, IGNORE_BODY, NOT_OPTIONAL);
+
+ // determine parameters from input AbstractSObject
+ final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+ String sObjectIdValue;
+ if (sObjectBase != null) {
+ sObjectName = sObjectBase.getClass().getSimpleName();
+ sObjectIdValue = sObjectBase.getId();
+ } else {
+ sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ sObjectIdValue = getParameter(SOBJECT_ID, exchange, USE_BODY, NOT_OPTIONAL);
+ }
+ final String sObjectId = sObjectIdValue;
+
+ restClient.getBlobField(sObjectName, sObjectId, sObjectBlobFieldName,
+ new RestClient.ResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException exception) {
+ processResponse(exchange, response, exception, callback);
+ restoreFields(exchange, sObjectBase, sObjectId, null, null);
+ }
+ });
+ break;
+ }
+
+ case QUERY:
+ final String sObjectQuery = getParameter(SOBJECT_QUERY, exchange, USE_BODY, NOT_OPTIONAL);
+
+ // use sObject name to load class
+ setResponseClass(exchange, null);
+
+ restClient.query(sObjectQuery, new RestClient.ResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException exception) {
+ processResponse(exchange, response, exception, callback);
+ }
+ });
+ break;
+
+ case QUERY_MORE:
+ // reuse SOBJECT_QUERY parameter name for nextRecordsUrl
+ final String nextRecordsUrl = getParameter(SOBJECT_QUERY, exchange, USE_BODY, NOT_OPTIONAL);
+
+ // use custom response class property
+ setResponseClass(exchange, null);
+
+ restClient.queryMore(nextRecordsUrl, new RestClient.ResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException exception) {
+ processResponse(exchange, response, exception, callback);
+ }
+ });
+ break;
+
+ case SEARCH:
+ final String sObjectSearch = getParameter(SOBJECT_SEARCH, exchange, USE_BODY, NOT_OPTIONAL);
+
+ restClient.search(sObjectSearch, new RestClient.ResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException exception) {
+ processResponse(exchange, response, exception, callback);
+ }
+ });
+ break;
+
+ }
+
+ } catch (SalesforceException e) {
+ exchange.setException(new SalesforceException(
+ String.format("Error processing %s: [%s] \"%s\"",
+ operationName, e.getStatusCode(), e.getMessage()),
+ e));
+ callback.done(true);
+ return true;
+ } catch (RuntimeException e) {
+ exchange.setException(new SalesforceException(
+ String.format("Unexpected Error processing %s: \"%s\"",
+ operationName, e.getMessage()),
+ e));
+ callback.done(true);
+ return true;
+ }
+
+ // continue routing asynchronously
+ return false;
+ }
+
+ private void restoreFields(Exchange exchange, AbstractSObjectBase sObjectBase,
+ String sObjectId, String sObjectExtIdName, Object oldValue) {
+ // restore fields
+ if (sObjectBase != null) {
+ // restore the Id if it was cleared
+ if (sObjectId != null) {
+ sObjectBase.setId(sObjectId);
+ }
+ // restore the external id if it was cleared
+ if (sObjectExtIdName != null && oldValue != null) {
+ try {
+ setPropertyValue(sObjectBase, sObjectExtIdName, oldValue);
+ } catch (SalesforceException e) {
+ // YES, the exchange may fail if the property cannot be reset!!!
+ exchange.setException(e);
+ }
+ }
+ }
+ }
+
+ private void setPropertyValue(AbstractSObjectBase sObjectBase, String name, Object value) throws SalesforceException {
+ try {
+ // set the value with the set method
+ Method setMethod = sObjectBase.getClass().getMethod("set" + name, value.getClass());
+ setMethod.invoke(sObjectBase, value);
+ } catch (NoSuchMethodException e) {
+ throw new SalesforceException(
+ String.format("SObject %s does not have a field %s",
+ sObjectBase.getClass().getName(), name),
+ e);
+ } catch (InvocationTargetException e) {
+ throw new SalesforceException(
+ String.format("Error setting value %s.%s",
+ sObjectBase.getClass().getSimpleName(), name),
+ e);
+ } catch (IllegalAccessException e) {
+ throw new SalesforceException(
+ String.format("Error accessing value %s.%s",
+ sObjectBase.getClass().getSimpleName(), name),
+ e);
+ }
+ }
+
+ private Object getAndClearPropertyValue(AbstractSObjectBase sObjectBase, String propertyName) throws SalesforceException {
+ try {
+ // obtain the value using the get method
+ Method getMethod = sObjectBase.getClass().getMethod("get" + propertyName);
+ Object value = getMethod.invoke(sObjectBase);
+
+ // clear the value with the set method
+ Method setMethod = sObjectBase.getClass().getMethod("set" + propertyName, getMethod.getReturnType());
+ setMethod.invoke(sObjectBase, new Object[] { null });
+
+ return value;
+ } catch (NoSuchMethodException e) {
+ throw new SalesforceException(
+ String.format("SObject %s does not have a field %s",
+ sObjectBase.getClass().getSimpleName(), propertyName),
+ e);
+ } catch (InvocationTargetException e) {
+ throw new SalesforceException(
+ String.format("Error getting/setting value %s.%s",
+ sObjectBase.getClass().getSimpleName(), propertyName),
+ e);
+ } catch (IllegalAccessException e) {
+ throw new SalesforceException(
+ String.format("Error accessing value %s.%s",
+ sObjectBase.getClass().getSimpleName(), propertyName),
+ e);
+ }
+ }
+
+ // pre-process request message
+ protected abstract void processRequest(Exchange exchange) throws SalesforceException;
+
+ // get request stream from In message
+ protected abstract InputStream getRequestStream(Exchange exchange) throws SalesforceException;
+
+ private void setResponseClass(Exchange exchange, String sObjectName) throws SalesforceException {
+ Class<?> sObjectClass;
+
+ if (sObjectName != null) {
+ // lookup class from class map
+ sObjectClass = classMap.get(sObjectName);
+ if (null == sObjectClass) {
+ throw new SalesforceException(String.format("No class found for SObject %s", sObjectName), null);
+ }
+
+ } else {
+
+ // use custom response class property
+ final String className = getParameter(SOBJECT_CLASS, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ try {
+ sObjectClass = endpoint.getComponent().getCamelContext()
+ .getClassResolver().resolveMandatoryClass(className);
+ } catch (ClassNotFoundException e) {
+ throw new SalesforceException(
+ String.format("SObject class not found %s, %s",
+ className, e.getMessage()),
+ e);
+ }
+ }
+ exchange.setProperty(RESPONSE_CLASS, sObjectClass);
+ }
+
+ // process response entity and set out message in exchange
+ protected abstract void processResponse(Exchange exchange, InputStream responseEntity, SalesforceException ex, AsyncCallback callback);
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractSalesforceProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractSalesforceProcessor.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractSalesforceProcessor.java
new file mode 100644
index 0000000..e784458
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractSalesforceProcessor.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.eclipse.jetty.client.HttpClient;
+import org.apache.camel.component.salesforce.SalesforceComponent;
+import org.apache.camel.component.salesforce.SalesforceEndpoint;
+import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.component.salesforce.internal.OperationName;
+import org.apache.camel.component.salesforce.internal.SalesforceSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public abstract class AbstractSalesforceProcessor implements SalesforceProcessor {
+
+ protected static final boolean NOT_OPTIONAL = false;
+ protected static final boolean IS_OPTIONAL = true;
+ protected static final boolean USE_BODY = true;
+ protected static final boolean IGNORE_BODY = false;
+ protected final Logger LOG = LoggerFactory.getLogger(this.getClass());
+
+ protected final SalesforceEndpoint endpoint;
+ protected final Map<String, String> endpointConfigMap;
+
+ protected final OperationName operationName;
+ protected final SalesforceSession session;
+ protected final HttpClient httpClient;
+
+ public AbstractSalesforceProcessor(SalesforceEndpoint endpoint) {
+ this.endpoint = endpoint;
+ this.operationName = endpoint.getOperationName();
+ this.endpointConfigMap = endpoint.getConfiguration().toValueMap();
+
+ final SalesforceComponent component = endpoint.getComponent();
+ this.session = component.getSession();
+ this.httpClient = endpoint.getConfiguration().getHttpClient();
+ }
+
+ @Override
+ public abstract boolean process(Exchange exchange, AsyncCallback callback);
+
+ /**
+ * Gets value for a parameter from header, endpoint config, or exchange body (optional).
+ *
+ * @param exchange exchange to inspect
+ * @param convertInBody converts In body to String value if true
+ * @param propName name of property
+ * @param optional if {@code true} returns null, otherwise throws RestException
+ * @return value of property, or {@code null} for optional parameters if not found.
+ * @throws org.apache.camel.component.salesforce.api.SalesforceException if the property can't be found.
+ */
+ protected final String getParameter(String propName, Exchange exchange, boolean convertInBody, boolean optional) throws SalesforceException {
+ String propValue = exchange.getIn().getHeader(propName, String.class);
+ propValue = propValue == null ? endpointConfigMap.get(propName) : propValue;
+ propValue = (propValue == null && convertInBody) ? exchange.getIn().getBody(String.class) : propValue;
+
+ // error if property was not set
+ if (propValue == null && !optional) {
+ String msg = "Missing property " + propName;
+ throw new SalesforceException(msg, null);
+ }
+
+ return propValue;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/BulkApiProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/BulkApiProcessor.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/BulkApiProcessor.java
new file mode 100644
index 0000000..6e070f7
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/BulkApiProcessor.java
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.processor;
+
+import org.apache.camel.*;
+import org.apache.camel.converter.stream.StreamCacheConverter;
+import org.apache.camel.util.ServiceHelper;
+import org.apache.camel.component.salesforce.SalesforceEndpoint;
+import org.apache.camel.component.salesforce.SalesforceEndpointConfig;
+import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.component.salesforce.api.dto.bulk.*;
+import org.apache.camel.component.salesforce.internal.client.BulkApiClient;
+import org.apache.camel.component.salesforce.internal.client.DefaultBulkApiClient;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import static org.apache.camel.component.salesforce.SalesforceEndpointConfig.*;
+
+public class BulkApiProcessor extends AbstractSalesforceProcessor {
+
+ private BulkApiClient bulkClient;
+
+ public BulkApiProcessor(SalesforceEndpoint endpoint) throws SalesforceException {
+ super(endpoint);
+
+ this.bulkClient = new DefaultBulkApiClient(
+ endpointConfigMap.get(SalesforceEndpointConfig.API_VERSION), session, httpClient);
+ }
+
+ @Override
+ public boolean process(final Exchange exchange, final AsyncCallback callback) {
+
+ boolean done = false;
+ try {
+ switch (operationName) {
+ case CREATE_JOB:
+ JobInfo jobBody = exchange.getIn().getMandatoryBody(JobInfo.class);
+ bulkClient.createJob(jobBody, new BulkApiClient.JobInfoResponseCallback() {
+ @Override
+ public void onResponse(JobInfo jobInfo, SalesforceException ex) {
+ processResponse(exchange, jobInfo, ex, callback);
+ }
+ });
+
+ break;
+
+ case GET_JOB:
+ jobBody = exchange.getIn().getBody(JobInfo.class);
+ String jobId;
+ if (jobBody != null) {
+ jobId = jobBody.getId();
+ } else {
+ jobId = getParameter(JOB_ID, exchange, USE_BODY, NOT_OPTIONAL);
+ }
+ bulkClient.getJob(jobId, new BulkApiClient.JobInfoResponseCallback() {
+ @Override
+ public void onResponse(JobInfo jobInfo, SalesforceException ex) {
+ processResponse(exchange, jobInfo, ex, callback);
+ }
+ });
+
+ break;
+
+ case CLOSE_JOB:
+ jobBody = exchange.getIn().getBody(JobInfo.class);
+ if (jobBody != null) {
+ jobId = jobBody.getId();
+ } else {
+ jobId = getParameter(JOB_ID, exchange, USE_BODY, NOT_OPTIONAL);
+ }
+ bulkClient.closeJob(jobId, new BulkApiClient.JobInfoResponseCallback() {
+ @Override
+ public void onResponse(JobInfo jobInfo, SalesforceException ex) {
+ processResponse(exchange, jobInfo, ex, callback);
+ }
+ });
+
+ break;
+
+ case ABORT_JOB:
+ jobBody = exchange.getIn().getBody(JobInfo.class);
+ if (jobBody != null) {
+ jobId = jobBody.getId();
+ } else {
+ jobId = getParameter(JOB_ID, exchange, USE_BODY, NOT_OPTIONAL);
+ }
+ bulkClient.abortJob(jobId, new BulkApiClient.JobInfoResponseCallback() {
+ @Override
+ public void onResponse(JobInfo jobInfo, SalesforceException ex) {
+ processResponse(exchange, jobInfo, ex, callback);
+ }
+ });
+
+ break;
+
+ case CREATE_BATCH:
+ // since request is in the body, use headers or endpoint params
+ ContentType contentType = ContentType.fromValue(
+ getParameter(CONTENT_TYPE, exchange, IGNORE_BODY, NOT_OPTIONAL));
+ jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+
+ InputStream request;
+ try {
+ request = exchange.getIn().getMandatoryBody(InputStream.class);
+ } catch (CamelException e) {
+ String msg = "Error preparing batch request: " + e.getMessage();
+ throw new SalesforceException(msg, e);
+ }
+
+ bulkClient.createBatch(request, jobId, contentType, new BulkApiClient.BatchInfoResponseCallback() {
+ @Override
+ public void onResponse(BatchInfo batchInfo, SalesforceException ex) {
+ processResponse(exchange, batchInfo, ex, callback);
+ }
+ });
+
+ break;
+
+ case GET_BATCH:
+ BatchInfo batchBody = exchange.getIn().getBody(BatchInfo.class);
+ String batchId;
+ if (batchBody != null) {
+ jobId = batchBody.getJobId();
+ batchId = batchBody.getId();
+ } else {
+ jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ batchId = getParameter(BATCH_ID, exchange, USE_BODY, NOT_OPTIONAL);
+ }
+ bulkClient.getBatch(jobId, batchId, new BulkApiClient.BatchInfoResponseCallback() {
+ @Override
+ public void onResponse(BatchInfo batchInfo, SalesforceException ex) {
+ processResponse(exchange, batchInfo, ex, callback);
+ }
+ });
+
+ break;
+
+ case GET_ALL_BATCHES:
+ jobBody = exchange.getIn().getBody(JobInfo.class);
+ if (jobBody != null) {
+ jobId = jobBody.getId();
+ } else {
+ jobId = getParameter(JOB_ID, exchange, USE_BODY, NOT_OPTIONAL);
+ }
+ bulkClient.getAllBatches(jobId, new BulkApiClient.BatchInfoListResponseCallback() {
+ @Override
+ public void onResponse(List<BatchInfo> batchInfoList, SalesforceException ex) {
+ processResponse(exchange, batchInfoList, ex, callback);
+ }
+ });
+
+ break;
+
+ case GET_REQUEST:
+ batchBody = exchange.getIn().getBody(BatchInfo.class);
+ if (batchBody != null) {
+ jobId = batchBody.getJobId();
+ batchId = batchBody.getId();
+ } else {
+ jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ batchId = getParameter(BATCH_ID, exchange, USE_BODY, NOT_OPTIONAL);
+ }
+
+ bulkClient.getRequest(jobId, batchId, new BulkApiClient.StreamResponseCallback() {
+ @Override
+ public void onResponse(InputStream inputStream, SalesforceException ex) {
+ // read the request stream into a StreamCache temp file
+ // ensures the connection is read
+ StreamCache body = null;
+ if (inputStream != null) {
+ try {
+ body = StreamCacheConverter.convertToStreamCache(inputStream, exchange);
+ } catch (IOException e) {
+ String msg = "Error retrieving batch request: " + e.getMessage();
+ ex = new SalesforceException(msg, e);
+ } finally {
+ // close the input stream to release the Http connection
+ try {
+ inputStream.close();
+ } catch (IOException ignore) {
+ }
+ }
+ }
+ processResponse(exchange, body, ex, callback);
+ }
+ });
+
+ break;
+
+ case GET_RESULTS:
+ batchBody = exchange.getIn().getBody(BatchInfo.class);
+ if (batchBody != null) {
+ jobId = batchBody.getJobId();
+ batchId = batchBody.getId();
+ } else {
+ jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ batchId = getParameter(BATCH_ID, exchange, USE_BODY, NOT_OPTIONAL);
+ }
+ bulkClient.getResults(jobId, batchId, new BulkApiClient.StreamResponseCallback() {
+ @Override
+ public void onResponse(InputStream inputStream, SalesforceException ex) {
+ // read the result stream into a StreamCache temp file
+ // ensures the connection is read
+ StreamCache body = null;
+ if (inputStream != null) {
+ try {
+ body = StreamCacheConverter.convertToStreamCache(inputStream, exchange);
+ } catch (IOException e) {
+ String msg = "Error retrieving batch results: " + e.getMessage();
+ ex = new SalesforceException(msg, e);
+ } finally {
+ // close the input stream to release the Http connection
+ try {
+ inputStream.close();
+ } catch (IOException ignore) {
+ }
+ }
+ }
+ processResponse(exchange, body, ex, callback);
+ }
+ });
+
+ break;
+
+ case CREATE_BATCH_QUERY:
+ jobBody = exchange.getIn().getBody(JobInfo.class);
+ String soqlQuery;
+ if (jobBody != null) {
+ jobId = jobBody.getId();
+ contentType = jobBody.getContentType();
+ // use SOQL query from header or endpoint config
+ soqlQuery = getParameter(SOBJECT_QUERY, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ } else {
+ jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ contentType = ContentType.fromValue(
+ getParameter(CONTENT_TYPE, exchange, IGNORE_BODY, NOT_OPTIONAL));
+ // reuse SOBJECT_QUERY property
+ soqlQuery = getParameter(SOBJECT_QUERY, exchange, USE_BODY, NOT_OPTIONAL);
+ }
+ bulkClient.createBatchQuery(jobId, soqlQuery, contentType,
+ new BulkApiClient.BatchInfoResponseCallback() {
+ @Override
+ public void onResponse(BatchInfo batchInfo, SalesforceException ex) {
+ processResponse(exchange, batchInfo, ex, callback);
+ }
+ });
+
+ break;
+
+ case GET_QUERY_RESULT_IDS:
+ batchBody = exchange.getIn().getBody(BatchInfo.class);
+ if (batchBody != null) {
+ jobId = batchBody.getJobId();
+ batchId = batchBody.getId();
+ } else {
+ jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ batchId = getParameter(BATCH_ID, exchange, USE_BODY, NOT_OPTIONAL);
+ }
+ bulkClient.getQueryResultIds(jobId, batchId, new BulkApiClient.QueryResultIdsCallback() {
+ @Override
+ public void onResponse(List<String> ids, SalesforceException ex) {
+ processResponse(exchange, ids, ex, callback);
+ }
+ });
+
+ break;
+
+ case GET_QUERY_RESULT:
+ batchBody = exchange.getIn().getBody(BatchInfo.class);
+ String resultId;
+ if (batchBody != null) {
+ jobId = batchBody.getJobId();
+ batchId = batchBody.getId();
+ resultId = getParameter(RESULT_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ } else {
+ jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ batchId = getParameter(BATCH_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ resultId = getParameter(RESULT_ID, exchange, USE_BODY, NOT_OPTIONAL);
+ }
+ bulkClient.getQueryResult(jobId, batchId, resultId, new BulkApiClient.StreamResponseCallback() {
+ @Override
+ public void onResponse(InputStream inputStream, SalesforceException ex) {
+ StreamCache body = null;
+ if (inputStream != null) {
+ // read the result stream into a StreamCache temp file
+ // ensures the connection is read
+ try {
+ body = StreamCacheConverter.convertToStreamCache(inputStream, exchange);
+ } catch (IOException e) {
+ String msg = "Error retrieving query result: " + e.getMessage();
+ ex = new SalesforceException(msg, e);
+ } finally {
+ // close the input stream to release the Http connection
+ try {
+ inputStream.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ }
+ processResponse(exchange, body, ex, callback);
+ }
+ });
+
+ break;
+ }
+
+ } catch (SalesforceException e) {
+ exchange.setException(new SalesforceException(
+ String.format("Error processing %s: [%s] \"%s\"",
+ operationName, e.getStatusCode(), e.getMessage()),
+ e));
+ callback.done(true);
+ done = true;
+ } catch (InvalidPayloadException e) {
+ exchange.setException(new SalesforceException(
+ String.format("Unexpected Error processing %s: \"%s\"",
+ operationName, e.getMessage()),
+ e));
+ callback.done(true);
+ done = true;
+ } catch (RuntimeException e) {
+ exchange.setException(new SalesforceException(
+ String.format("Unexpected Error processing %s: \"%s\"",
+ operationName, e.getMessage()),
+ e));
+ callback.done(true);
+ done = true;
+ }
+
+ // continue routing asynchronously if false
+ return done;
+ }
+
+ private void processResponse(Exchange exchange, Object body, SalesforceException ex, AsyncCallback callback) {
+ final Message out = exchange.getOut();
+ if (ex != null) {
+ exchange.setException(ex);
+ } else {
+ out.setBody(body);
+ }
+
+ // copy headers and attachments
+ out.getHeaders().putAll(exchange.getIn().getHeaders());
+ out.getAttachments().putAll(exchange.getIn().getAttachments());
+
+ // signal exchange completion
+ callback.done(false);
+ }
+
+ @Override
+ public void start() throws Exception {
+ ServiceHelper.startService(bulkClient);
+ }
+
+ @Override
+ public void stop() throws Exception {
+ // stop the client
+ ServiceHelper.stopService(bulkClient);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/JsonRestProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/JsonRestProcessor.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/JsonRestProcessor.java
new file mode 100644
index 0000000..3ac32c6
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/JsonRestProcessor.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.codehaus.jackson.type.TypeReference;
+import org.eclipse.jetty.util.StringUtil;
+import org.apache.camel.component.salesforce.SalesforceEndpoint;
+import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.component.salesforce.api.dto.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+public class JsonRestProcessor extends AbstractRestProcessor {
+
+ // it is ok to use a single thread safe ObjectMapper
+ private final ObjectMapper objectMapper;
+ private static final String RESPONSE_TYPE = JsonRestProcessor.class.getName() + ".responseType";
+
+ public JsonRestProcessor(SalesforceEndpoint endpoint) throws SalesforceException {
+ super(endpoint);
+
+ this.objectMapper = new ObjectMapper();
+ // enable date time support including Joda DateTime
+ this.objectMapper.configure(SerializationConfig.Feature.WRITE_DATES_AS_TIMESTAMPS, false);
+ }
+
+ @Override
+ protected void processRequest(Exchange exchange) {
+
+ switch (operationName) {
+ case GET_VERSIONS:
+ // handle in built response types
+ exchange.setProperty(RESPONSE_TYPE, new TypeReference<List<Version>>() {});
+ break;
+
+ case GET_RESOURCES:
+ // handle in built response types
+ exchange.setProperty(RESPONSE_CLASS, RestResources.class);
+ break;
+
+ case GET_GLOBAL_OBJECTS:
+ // handle in built response types
+ exchange.setProperty(RESPONSE_CLASS, GlobalObjects.class);
+ break;
+
+ case GET_BASIC_INFO:
+ // handle in built response types
+ exchange.setProperty(RESPONSE_CLASS, SObjectBasicInfo.class);
+ break;
+
+ case GET_DESCRIPTION:
+ // handle in built response types
+ exchange.setProperty(RESPONSE_CLASS, SObjectDescription.class);
+ break;
+
+ case CREATE_SOBJECT:
+ // handle known response type
+ exchange.setProperty(RESPONSE_CLASS, CreateSObjectResult.class);
+ break;
+
+ case UPSERT_SOBJECT:
+ // handle known response type
+ exchange.setProperty(RESPONSE_CLASS, CreateSObjectResult.class);
+ break;
+
+ case SEARCH:
+ // handle known response type
+ exchange.setProperty(RESPONSE_TYPE, new TypeReference<List<SearchResult>>() {});
+ break;
+
+ }
+ }
+
+ @Override
+ protected InputStream getRequestStream(Exchange exchange) throws SalesforceException {
+ try {
+ InputStream request;
+ Message in = exchange.getIn();
+ request = in.getBody(InputStream.class);
+ if (request == null) {
+ AbstractSObjectBase sObject = in.getBody(AbstractSObjectBase.class);
+ if (sObject != null) {
+ // marshall the SObject
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ objectMapper.writeValue(out, sObject);
+ request = new ByteArrayInputStream(out.toByteArray());
+ } else {
+ // if all else fails, get body as String
+ final String body = in.getBody(String.class);
+ if (null == body) {
+ String msg = "Unsupported request message body " +
+ (in.getBody() == null ? null : in.getBody().getClass());
+ throw new SalesforceException(msg, null);
+ } else {
+ request = new ByteArrayInputStream(body.getBytes(StringUtil.__UTF8_CHARSET));
+ }
+ }
+ }
+
+ return request;
+
+ } catch (IOException e) {
+ String msg = "Error marshaling request: " + e.getMessage();
+ throw new SalesforceException(msg, e);
+ }
+ }
+
+ @Override
+ protected void processResponse(Exchange exchange, InputStream responseEntity, SalesforceException ex, AsyncCallback callback) {
+
+ // process JSON response for TypeReference
+ try {
+ // do we need to un-marshal a response
+ if (responseEntity != null) {
+ Object response = null;
+ Class<?> responseClass = exchange.getProperty(RESPONSE_CLASS, Class.class);
+ if (responseClass != null) {
+ response = objectMapper.readValue(responseEntity, responseClass);
+ } else {
+ TypeReference<?> responseType = exchange.getProperty(RESPONSE_TYPE, TypeReference.class);
+ if (responseType != null) {
+ response = objectMapper.readValue(responseEntity, responseType);
+ } else {
+ // return the response as a stream, for getBlobField
+ response = responseEntity;
+ }
+ }
+ exchange.getOut().setBody(response);
+ } else {
+ exchange.setException(ex);
+ }
+ // copy headers and attachments
+ exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
+ exchange.getOut().getAttachments().putAll(exchange.getIn().getAttachments());
+ } catch (IOException e) {
+ String msg = "Error parsing JSON response: " + e.getMessage();
+ exchange.setException(new SalesforceException(msg, e));
+ } finally {
+ // cleanup temporary exchange headers
+ exchange.removeProperty(RESPONSE_CLASS);
+ exchange.removeProperty(RESPONSE_TYPE);
+
+ // consume response entity
+ try {
+ if (responseEntity != null) {
+ responseEntity.close();
+ }
+ } catch (IOException ignored) {
+ }
+
+ // notify callback that exchange is done
+ callback.done(false);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/SalesforceProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/SalesforceProcessor.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/SalesforceProcessor.java
new file mode 100644
index 0000000..36f77f6
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/SalesforceProcessor.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Service;
+
+public interface SalesforceProcessor extends Service {
+
+ boolean process(Exchange exchange, AsyncCallback callback);
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/XmlRestProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/XmlRestProcessor.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/XmlRestProcessor.java
new file mode 100644
index 0000000..75449b6
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/XmlRestProcessor.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.processor;
+
+import com.thoughtworks.xstream.XStream;
+import com.thoughtworks.xstream.XStreamException;
+import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
+import com.thoughtworks.xstream.io.naming.NoNameCoder;
+import com.thoughtworks.xstream.io.xml.CompactWriter;
+import com.thoughtworks.xstream.io.xml.XppDriver;
+import com.thoughtworks.xstream.mapper.CachingMapper;
+import com.thoughtworks.xstream.mapper.CannotResolveClassException;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.eclipse.jetty.util.StringUtil;
+import org.apache.camel.component.salesforce.SalesforceEndpoint;
+import org.apache.camel.component.salesforce.api.JodaTimeConverter;
+import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.component.salesforce.api.dto.*;
+
+import java.io.*;
+
+import static org.apache.camel.component.salesforce.SalesforceEndpointConfig.SOBJECT_NAME;
+
+public class XmlRestProcessor extends AbstractRestProcessor {
+
+ // although XStream is generally thread safe, because of the way we use aliases
+ // for GET_BASIC_INFO and GET_DESCRIPTION, we need to use a ThreadLocal
+ // not very efficient when both JSON and XML are used together with a single Thread pool
+ // but this will do for now
+ private static ThreadLocal<XStream> xStream =
+ new ThreadLocal<XStream>() {
+ @Override
+ protected XStream initialValue() {
+ // use NoNameCoder to avoid escaping __ in custom field names
+ // and CompactWriter to avoid pretty printing
+ XStream result = new XStream(new XppDriver(new NoNameCoder()) {
+ @Override
+ public HierarchicalStreamWriter createWriter(Writer out) {
+ return new CompactWriter(out, getNameCoder());
+ }
+
+ });
+ result.registerConverter(new JodaTimeConverter());
+ return result;
+ }
+ };
+
+ private static final String RESPONSE_ALIAS = XmlRestProcessor.class.getName() + ".responseAlias";
+
+ public XmlRestProcessor(SalesforceEndpoint endpoint) throws SalesforceException {
+ super(endpoint);
+
+ }
+
+ @Override
+ protected void processRequest(Exchange exchange) throws SalesforceException {
+
+ switch (operationName) {
+ case GET_VERSIONS:
+ exchange.setProperty(RESPONSE_CLASS, Versions.class);
+ break;
+
+ case GET_RESOURCES:
+ exchange.setProperty(RESPONSE_CLASS, RestResources.class);
+ break;
+
+ case GET_GLOBAL_OBJECTS:
+ // handle in built response types
+ exchange.setProperty(RESPONSE_CLASS, GlobalObjects.class);
+ break;
+
+ case GET_BASIC_INFO:
+ // handle in built response types
+ exchange.setProperty(RESPONSE_CLASS, SObjectBasicInfo.class);
+
+ // need to add alias for Salesforce XML that uses SObject name as root element
+ exchange.setProperty(RESPONSE_ALIAS,
+ getParameter(SOBJECT_NAME, exchange, USE_BODY, NOT_OPTIONAL));
+ break;
+
+ case GET_DESCRIPTION:
+ // handle in built response types
+ exchange.setProperty(RESPONSE_CLASS, SObjectDescription.class);
+
+ // need to add alias for Salesforce XML that uses SObject name as root element
+ exchange.setProperty(RESPONSE_ALIAS,
+ getParameter(SOBJECT_NAME, exchange, USE_BODY, NOT_OPTIONAL));
+ break;
+
+ case GET_SOBJECT:
+ // need to add alias for Salesforce XML that uses SObject name as root element
+ exchange.setProperty(RESPONSE_ALIAS,
+ getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL));
+ break;
+
+ case CREATE_SOBJECT:
+ // handle known response type
+ exchange.setProperty(RESPONSE_CLASS, CreateSObjectResult.class);
+ break;
+
+ case GET_SOBJECT_WITH_ID:
+ // need to add alias for Salesforce XML that uses SObject name as root element
+ exchange.setProperty(RESPONSE_ALIAS,
+ getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL));
+ break;
+
+ case UPSERT_SOBJECT:
+ // handle known response type
+ exchange.setProperty(RESPONSE_CLASS, CreateSObjectResult.class);
+ break;
+
+ case QUERY:
+ case QUERY_MORE:
+ // need to add alias for Salesforce XML that uses SObject name as root element
+ exchange.setProperty(RESPONSE_ALIAS,
+ "QueryResult");
+ break;
+
+ case SEARCH:
+ // handle known response type
+ exchange.setProperty(RESPONSE_CLASS, SearchResults.class);
+ break;
+
+ }
+
+ }
+
+ protected InputStream getRequestStream(Exchange exchange) throws SalesforceException {
+ final XStream localXStream = xStream.get();
+ try {
+ // get request stream from In message
+ Message in = exchange.getIn();
+ InputStream request = in.getBody(InputStream.class);
+ if (request == null) {
+ AbstractSObjectBase sObject = in.getBody(AbstractSObjectBase.class);
+ if (sObject != null) {
+ // marshall the SObject
+ // first process annotations on the class, for things like alias, etc.
+ localXStream.processAnnotations(sObject.getClass());
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ // make sure we write the XML with the right encoding
+ localXStream.toXML(sObject, new OutputStreamWriter(out, StringUtil.__UTF8_CHARSET));
+ request = new ByteArrayInputStream(out.toByteArray());
+ } else {
+ // if all else fails, get body as String
+ final String body = in.getBody(String.class);
+ if (null == body) {
+ String msg = "Unsupported request message body " +
+ (in.getBody() == null ? null : in.getBody().getClass());
+ throw new SalesforceException(msg, null);
+ } else {
+ request = new ByteArrayInputStream(body.getBytes(StringUtil.__UTF8_CHARSET));
+ }
+ }
+ }
+ return request;
+ } catch (XStreamException e) {
+ String msg = "Error marshaling request: " + e.getMessage();
+ throw new SalesforceException(msg, e);
+ }
+ }
+
+ @Override
+ protected void processResponse(Exchange exchange, InputStream responseEntity,
+ SalesforceException exception, AsyncCallback callback) {
+ final XStream localXStream = xStream.get();
+ try {
+ // do we need to un-marshal a response
+ if (responseEntity != null) {
+ final Class<?> responseClass = exchange.getProperty(RESPONSE_CLASS, Class.class);
+ Object response;
+ if (responseClass != null) {
+ // its ok to call this multiple times, as xstream ignores duplicate calls
+ localXStream.processAnnotations(responseClass);
+ final String responseAlias = exchange.getProperty(RESPONSE_ALIAS, String.class);
+ if (responseAlias != null) {
+ // extremely dirty, need to flush entire cache if its holding on to an old alias!!!
+ final CachingMapper mapper = (CachingMapper) localXStream.getMapper();
+ try {
+ if (mapper.realClass(responseAlias) != responseClass) {
+ mapper.flushCache();
+ }
+ } catch (CannotResolveClassException ignore) {
+ }
+ localXStream.alias(responseAlias, responseClass);
+ }
+ response = responseClass.newInstance();
+ localXStream.fromXML(responseEntity, response);
+ } else {
+ // return the response as a stream, for getBlobField
+ response = responseEntity;
+ }
+ exchange.getOut().setBody(response);
+ } else {
+ exchange.setException(exception);
+ }
+ // copy headers and attachments
+ exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
+ exchange.getOut().getAttachments().putAll(exchange.getIn().getAttachments());
+ } catch (XStreamException e) {
+ String msg = "Error parsing XML response: " + e.getMessage();
+ exchange.setException(new SalesforceException(msg, e));
+ } catch (Exception e) {
+ String msg = "Error creating XML response: " + e.getMessage();
+ exchange.setException(new SalesforceException(msg, e));
+ } finally {
+ // cleanup temporary exchange headers
+ exchange.removeProperty(RESPONSE_CLASS);
+ exchange.removeProperty(RESPONSE_ALIAS);
+
+ // consume response entity
+ if (responseEntity != null) {
+ try {
+ responseEntity.close();
+ } catch (IOException ignored) {
+ }
+ }
+
+ // notify callback that exchange is done
+ callback.done(false);
+ }
+ }
+
+}