You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/06/05 17:12:51 UTC

[07/11] CAMEL-6428: camel-salesforce component. Thanks to Dhiraj Bokde for the contribution.

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/NotifyForOperationsEnum.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/NotifyForOperationsEnum.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/NotifyForOperationsEnum.java
new file mode 100644
index 0000000..807fef5
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/NotifyForOperationsEnum.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.dto;
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonValue;
+
+/**
+ * Salesforce Enumeration DTO for picklist NotifyForOperations
+ */
+public enum NotifyForOperationsEnum {
+
+    CREATE("Create"),
+    UPDATE("Update"),
+    ALL("All");
+
+    final String value;
+
+    private NotifyForOperationsEnum(String value) {
+        this.value = value;
+    }
+
+    @JsonValue
+    public String value() {
+        return this.value;
+    }
+
+    @JsonCreator
+    public static NotifyForOperationsEnum forValue(String value) {
+        for (NotifyForOperationsEnum e : NotifyForOperationsEnum.values()) {
+            if (e.value.equals(value)) {
+                return e;
+            }
+        }
+        throw new IllegalArgumentException(value);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/PushTopic.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/PushTopic.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/PushTopic.java
new file mode 100644
index 0000000..2135a16
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/PushTopic.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.dto;
+
+import com.thoughtworks.xstream.annotations.XStreamAlias;
+import com.thoughtworks.xstream.annotations.XStreamConverter;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.apache.camel.component.salesforce.api.PicklistEnumConverter;
+import org.apache.camel.component.salesforce.api.dto.AbstractSObjectBase;
+
+/**
+ * Salesforce DTO for SObject PushTopic
+ */
+@XStreamAlias("PushTopic")
+public class PushTopic extends AbstractSObjectBase {
+
+    private String Query;
+    private Double ApiVersion;
+    private Boolean IsActive;
+    @XStreamConverter(PicklistEnumConverter.class)
+    private NotifyForFieldsEnum NotifyForFields;
+    @XStreamConverter(PicklistEnumConverter.class)
+    private NotifyForOperationsEnum NotifyForOperations;
+    private String Description;
+
+    @JsonProperty("Query")
+    public String getQuery() {
+        return this.Query;
+    }
+
+    @JsonProperty("Query")
+    public void setQuery(String Query) {
+        this.Query = Query;
+    }
+
+    @JsonProperty("ApiVersion")
+    public Double getApiVersion() {
+        return this.ApiVersion;
+    }
+
+    @JsonProperty("ApiVersion")
+    public void setApiVersion(Double ApiVersion) {
+        this.ApiVersion = ApiVersion;
+    }
+
+    @JsonProperty("IsActive")
+    public Boolean getIsActive() {
+        return this.IsActive;
+    }
+
+    @JsonProperty("IsActive")
+    public void setIsActive(Boolean IsActive) {
+        this.IsActive = IsActive;
+    }
+
+    @JsonProperty("NotifyForFields")
+    public NotifyForFieldsEnum getNotifyForFields() {
+        return this.NotifyForFields;
+    }
+
+    @JsonProperty("NotifyForFields")
+    public void setNotifyForFields(NotifyForFieldsEnum NotifyForFields) {
+        this.NotifyForFields = NotifyForFields;
+    }
+
+    @JsonProperty("NotifyForOperations")
+    public NotifyForOperationsEnum getNotifyForOperations() {
+        return this.NotifyForOperations;
+    }
+
+    @JsonProperty("NotifyForOperations")
+    public void setNotifyForOperations(NotifyForOperationsEnum NotifyForOperations) {
+        this.NotifyForOperations = NotifyForOperations;
+    }
+
+    @JsonProperty("Description")
+    public String getDescription() {
+        return this.Description;
+    }
+
+    @JsonProperty("Description")
+    public void setDescription(String Description) {
+        this.Description = Description;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/QueryRecordsPushTopic.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/QueryRecordsPushTopic.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/QueryRecordsPushTopic.java
new file mode 100644
index 0000000..4adc13c
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/QueryRecordsPushTopic.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.dto;
+
+import com.thoughtworks.xstream.annotations.XStreamImplicit;
+import org.apache.camel.component.salesforce.api.dto.AbstractQueryRecordsBase;
+
+import java.util.List;
+
+/**
+ * Salesforce Query Records DTO for PushTopic
+ */
+public class QueryRecordsPushTopic extends AbstractQueryRecordsBase {
+    @XStreamImplicit
+    private List<PushTopic> records;
+
+    public List<PushTopic> getRecords() {
+        return records;
+    }
+
+    public void setRecords(List<PushTopic> records) {
+        this.records = records;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/RestErrors.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/RestErrors.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/RestErrors.java
new file mode 100644
index 0000000..caf59fe
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/RestErrors.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.dto;
+
+import com.thoughtworks.xstream.annotations.XStreamAlias;
+import com.thoughtworks.xstream.annotations.XStreamImplicit;
+import org.apache.camel.component.salesforce.api.dto.RestError;
+
+import java.util.List;
+
+/**
+ * DTO for Salesforce REST errors
+ */
+@XStreamAlias("Errors")
+public class RestErrors {
+
+    @XStreamImplicit(itemFieldName = "Error")
+    private List<RestError> errors;
+
+    public List<RestError> getErrors() {
+        return errors;
+    }
+
+    public void setErrors(List<RestError> errors) {
+        this.errors = errors;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractRestProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractRestProcessor.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractRestProcessor.java
new file mode 100644
index 0000000..1e40d18
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractRestProcessor.java
@@ -0,0 +1,538 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.util.ServiceHelper;
+import org.apache.camel.component.salesforce.SalesforceEndpoint;
+import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.component.salesforce.api.dto.AbstractSObjectBase;
+import org.apache.camel.component.salesforce.internal.PayloadFormat;
+import org.apache.camel.component.salesforce.internal.client.DefaultRestClient;
+import org.apache.camel.component.salesforce.internal.client.RestClient;
+
+import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Map;
+
+import static org.apache.camel.component.salesforce.SalesforceEndpointConfig.*;
+
+public abstract class AbstractRestProcessor extends AbstractSalesforceProcessor {
+
+    protected static final String RESPONSE_CLASS = AbstractRestProcessor.class.getName() + ".responseClass";
+
+    private RestClient restClient;
+    private Map<String, Class<?>> classMap;
+
+    public AbstractRestProcessor(SalesforceEndpoint endpoint) throws SalesforceException {
+        super(endpoint);
+
+        final PayloadFormat payloadFormat = endpoint.getConfiguration().getPayloadFormat();
+
+        this.restClient = new DefaultRestClient(httpClient, endpointConfigMap.get(API_VERSION),
+            payloadFormat.toString().toLowerCase() , session);
+
+        this.classMap = endpoint.getComponent().getClassMap();
+    }
+
+    @Override
+    public void start() throws Exception {
+        ServiceHelper.startService(restClient);
+    }
+
+    @Override
+    public void stop() throws Exception {
+        ServiceHelper.stopService(restClient);
+    }
+
+    @Override
+    public final boolean process(final Exchange exchange, final AsyncCallback callback) {
+
+        // pre-process request message
+        try {
+            processRequest(exchange);
+        } catch (SalesforceException e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
+        } catch (RuntimeException e) {
+            exchange.setException(new SalesforceException(e.getMessage(), e));
+            callback.done(true);
+            return true;
+        }
+
+        // call Salesforce asynchronously
+        try {
+
+            // call Operation using REST client
+            switch (operationName) {
+                case GET_VERSIONS:
+                    restClient.getVersions(new RestClient.ResponseCallback() {
+                        @Override
+                        public void onResponse(InputStream response, SalesforceException exception) {
+                            // process response entity and create out message
+                            processResponse(exchange, response, exception, callback);
+                        }
+                    });
+                    break;
+
+                case GET_RESOURCES:
+                    restClient.getResources(new RestClient.ResponseCallback() {
+                        @Override
+                        public void onResponse(InputStream response, SalesforceException exception) {
+                            processResponse(exchange, response, exception, callback);
+                        }
+                    });
+                    break;
+
+                case GET_GLOBAL_OBJECTS:
+                    restClient.getGlobalObjects(new RestClient.ResponseCallback() {
+                        @Override
+                        public void onResponse(InputStream response, SalesforceException exception) {
+                            processResponse(exchange, response, exception, callback);
+                        }
+                    });
+                    break;
+
+                case GET_BASIC_INFO:
+                    String sObjectName = getParameter(SOBJECT_NAME, exchange, USE_BODY, NOT_OPTIONAL);
+                    restClient.getBasicInfo(sObjectName, new RestClient.ResponseCallback() {
+                        @Override
+                        public void onResponse(InputStream response, SalesforceException exception) {
+                            processResponse(exchange, response, exception, callback);
+                        }
+                    });
+                    break;
+
+                case GET_DESCRIPTION:
+                    sObjectName = getParameter(SOBJECT_NAME, exchange, USE_BODY, NOT_OPTIONAL);
+                    restClient.getDescription(sObjectName, new RestClient.ResponseCallback() {
+                        @Override
+                        public void onResponse(InputStream response, SalesforceException exception) {
+                            processResponse(exchange, response, exception, callback);
+                        }
+                    });
+                    break;
+
+                case GET_SOBJECT:
+                {
+                    String sObjectIdValue;
+                    // determine parameters from input AbstractSObject
+                    final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+                    if (sObjectBase != null) {
+                        sObjectName = sObjectBase.getClass().getSimpleName();
+                        sObjectIdValue = sObjectBase.getId();
+                    } else {
+                        sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+                        sObjectIdValue = getParameter(SOBJECT_ID, exchange, USE_BODY, NOT_OPTIONAL);
+                    }
+                    final String sObjectId = sObjectIdValue;
+
+                    // use sObject name to load class
+                    setResponseClass(exchange, sObjectName);
+
+                    // get optional field list
+                    String fieldsValue = getParameter(SOBJECT_FIELDS, exchange, IGNORE_BODY, IS_OPTIONAL);
+                    String[] fields = null;
+                    if (fieldsValue != null) {
+                        fields = fieldsValue.split(",");
+                    }
+
+                    restClient.getSObject(sObjectName, sObjectId, fields, new RestClient.ResponseCallback() {
+                        @Override
+                        public void onResponse(InputStream response, SalesforceException exception) {
+                            processResponse(exchange, response, exception, callback);
+                            restoreFields(exchange, sObjectBase, sObjectId, null, null);
+                        }
+                    });
+
+                    break;
+                }
+
+                case CREATE_SOBJECT:
+                {
+                    // determine parameters from input AbstractSObject
+                    AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+                    if (sObjectBase != null) {
+                        sObjectName = sObjectBase.getClass().getSimpleName();
+                    } else {
+                        sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+                    }
+
+                    restClient.createSObject(sObjectName, getRequestStream(exchange),
+                        new RestClient.ResponseCallback() {
+                        @Override
+                        public void onResponse(InputStream response, SalesforceException exception) {
+                            processResponse(exchange, response, exception, callback);
+                        }
+                    });
+
+                    break;
+                }
+
+                case UPDATE_SOBJECT:
+                {
+                    // determine parameters from input AbstractSObject
+                    final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+                    String sObjectId;
+                    if (sObjectBase != null) {
+                        sObjectName = sObjectBase.getClass().getSimpleName();
+                        // remember the sObject Id
+                        sObjectId = sObjectBase.getId();
+                        // clear base object fields, which cannot be updated
+                        sObjectBase.clearBaseFields();
+                    } else {
+                        sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+                        sObjectId = getParameter(SOBJECT_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+                    }
+
+                    final String finalsObjectId = sObjectId;
+                    restClient.updateSObject(sObjectName, sObjectId, getRequestStream(exchange),
+                        new RestClient.ResponseCallback() {
+                            @Override
+                            public void onResponse(InputStream response, SalesforceException exception) {
+                                processResponse(exchange, response, exception, callback);
+                                restoreFields(exchange, sObjectBase, finalsObjectId, null, null);
+                            }
+                        });
+
+                    break;
+                }
+
+                case DELETE_SOBJECT:
+                {
+                    // determine parameters from input AbstractSObject
+                    final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+                    String sObjectIdValue;
+                    if (sObjectBase != null) {
+                        sObjectName = sObjectBase.getClass().getSimpleName();
+                        sObjectIdValue = sObjectBase.getId();
+                    } else {
+                        sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+                        sObjectIdValue = getParameter(SOBJECT_ID, exchange, USE_BODY, NOT_OPTIONAL);
+                    }
+                    final String sObjectId = sObjectIdValue;
+
+                    restClient.deleteSObject(sObjectName, sObjectId, new RestClient.ResponseCallback() {
+                        @Override
+                        public void onResponse(InputStream response, SalesforceException exception) {
+                            processResponse(exchange, response, exception, callback);
+                            restoreFields(exchange, sObjectBase, sObjectId, null, null);
+                        }
+                    });
+                    break;
+                }
+
+                case GET_SOBJECT_WITH_ID:
+                {
+                    Object oldValue = null;
+                    String sObjectExtIdValue;
+                    final String sObjectExtIdName = getParameter(SOBJECT_EXT_ID_NAME,
+                        exchange, IGNORE_BODY, NOT_OPTIONAL);
+
+                    // determine parameters from input AbstractSObject
+                    final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+                    if (sObjectBase != null) {
+                        sObjectName = sObjectBase.getClass().getSimpleName();
+                        oldValue = getAndClearPropertyValue(sObjectBase, sObjectExtIdName);
+                        sObjectExtIdValue = oldValue.toString();
+                    } else {
+                        sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+                        sObjectExtIdValue = getParameter(SOBJECT_EXT_ID_VALUE, exchange, USE_BODY, NOT_OPTIONAL);
+                    }
+
+                    // use sObject name to load class
+                    setResponseClass(exchange, sObjectName);
+
+                    final Object finalOldValue = oldValue;
+                    restClient.getSObjectWithId(sObjectName, sObjectExtIdName, sObjectExtIdValue,
+                        new RestClient.ResponseCallback() {
+                            @Override
+                            public void onResponse(InputStream response, SalesforceException exception) {
+                                processResponse(exchange, response, exception, callback);
+                                restoreFields(exchange, sObjectBase, null, sObjectExtIdName, finalOldValue);
+                            }
+                        });
+
+                    break;
+                }
+
+                case UPSERT_SOBJECT:
+                {
+                    String sObjectExtIdValue;
+                    final String sObjectExtIdName = getParameter(SOBJECT_EXT_ID_NAME, exchange,
+                        IGNORE_BODY, NOT_OPTIONAL);
+
+                    // determine parameters from input AbstractSObject
+                    Object oldValue = null;
+                    final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+                    if (sObjectBase != null) {
+                        sObjectName = sObjectBase.getClass().getSimpleName();
+                        oldValue = getAndClearPropertyValue(sObjectBase, sObjectExtIdName);
+                        sObjectExtIdValue = oldValue.toString();
+                        // clear base object fields, which cannot be updated
+                        sObjectBase.clearBaseFields();
+                    } else {
+                        sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+                        sObjectExtIdValue = getParameter(SOBJECT_EXT_ID_VALUE, exchange, IGNORE_BODY, NOT_OPTIONAL);
+                    }
+
+                    final Object finalOldValue = oldValue;
+                    restClient.upsertSObject(sObjectName, sObjectExtIdName, sObjectExtIdValue,
+                        getRequestStream(exchange), new RestClient.ResponseCallback() {
+                        @Override
+                        public void onResponse(InputStream response, SalesforceException exception) {
+                            processResponse(exchange, response, exception, callback);
+                            restoreFields(exchange, sObjectBase, null, sObjectExtIdName, finalOldValue);
+                        }
+                    });
+
+                    break;
+                }
+
+                case DELETE_SOBJECT_WITH_ID:
+                {
+                    final String sObjectExtIdName = getParameter(SOBJECT_EXT_ID_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+
+                    // determine parameters from input AbstractSObject
+                    Object oldValue = null;
+                    final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+                    String sObjectExtIdValue;
+                    if (sObjectBase != null) {
+                        sObjectName = sObjectBase.getClass().getSimpleName();
+                        oldValue = getAndClearPropertyValue(sObjectBase, sObjectExtIdName);
+                        sObjectExtIdValue = oldValue.toString();
+                    } else {
+                        sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+                        sObjectExtIdValue = getParameter(SOBJECT_EXT_ID_VALUE, exchange, USE_BODY, NOT_OPTIONAL);
+                    }
+
+                    final Object finalOldValue = oldValue;
+                    restClient.deleteSObjectWithId(sObjectName, sObjectExtIdName, sObjectExtIdValue,
+                        new RestClient.ResponseCallback() {
+                            @Override
+                            public void onResponse(InputStream response, SalesforceException exception) {
+                                processResponse(exchange, response, exception, callback);
+                                restoreFields(exchange, sObjectBase, null, sObjectExtIdName, finalOldValue);
+                            }
+                        });
+
+                    break;
+                }
+
+                case GET_BLOB_FIELD:
+                {
+                    // get blob field name
+                    final String sObjectBlobFieldName = getParameter(SOBJECT_BLOB_FIELD_NAME,
+                        exchange, IGNORE_BODY, NOT_OPTIONAL);
+
+                    // determine parameters from input AbstractSObject
+                    final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+                    String sObjectIdValue;
+                    if (sObjectBase != null) {
+                        sObjectName = sObjectBase.getClass().getSimpleName();
+                        sObjectIdValue = sObjectBase.getId();
+                    } else {
+                        sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+                        sObjectIdValue = getParameter(SOBJECT_ID, exchange, USE_BODY, NOT_OPTIONAL);
+                    }
+                    final String sObjectId = sObjectIdValue;
+
+                    restClient.getBlobField(sObjectName, sObjectId, sObjectBlobFieldName,
+                        new RestClient.ResponseCallback() {
+                            @Override
+                            public void onResponse(InputStream response, SalesforceException exception) {
+                                processResponse(exchange, response, exception, callback);
+                                restoreFields(exchange, sObjectBase, sObjectId, null, null);
+                            }
+                        });
+                    break;
+                }
+
+                case QUERY:
+                    final String sObjectQuery = getParameter(SOBJECT_QUERY, exchange, USE_BODY, NOT_OPTIONAL);
+
+                    // use sObject name to load class
+                    setResponseClass(exchange, null);
+
+                    restClient.query(sObjectQuery, new RestClient.ResponseCallback() {
+                        @Override
+                        public void onResponse(InputStream response, SalesforceException exception) {
+                            processResponse(exchange, response, exception, callback);
+                        }
+                    });
+                    break;
+
+                case QUERY_MORE:
+                    // reuse SOBJECT_QUERY parameter name for nextRecordsUrl
+                    final String nextRecordsUrl = getParameter(SOBJECT_QUERY, exchange, USE_BODY, NOT_OPTIONAL);
+
+                    // use custom response class property
+                    setResponseClass(exchange, null);
+
+                    restClient.queryMore(nextRecordsUrl, new RestClient.ResponseCallback() {
+                        @Override
+                        public void onResponse(InputStream response, SalesforceException exception) {
+                            processResponse(exchange, response, exception, callback);
+                        }
+                    });
+                    break;
+
+                case SEARCH:
+                    final String sObjectSearch = getParameter(SOBJECT_SEARCH, exchange, USE_BODY, NOT_OPTIONAL);
+
+                    restClient.search(sObjectSearch, new RestClient.ResponseCallback() {
+                        @Override
+                        public void onResponse(InputStream response, SalesforceException exception) {
+                            processResponse(exchange, response, exception, callback);
+                        }
+                    });
+                    break;
+
+            }
+
+        } catch (SalesforceException e) {
+            exchange.setException(new SalesforceException(
+                String.format("Error processing %s: [%s] \"%s\"",
+                    operationName, e.getStatusCode(), e.getMessage()),
+                e));
+            callback.done(true);
+            return true;
+        } catch (RuntimeException e) {
+            exchange.setException(new SalesforceException(
+                String.format("Unexpected Error processing %s: \"%s\"",
+                    operationName, e.getMessage()),
+                e));
+            callback.done(true);
+            return true;
+        }
+
+        // continue routing asynchronously
+        return false;
+    }
+
+    private void restoreFields(Exchange exchange, AbstractSObjectBase sObjectBase,
+                               String sObjectId, String sObjectExtIdName, Object oldValue) {
+        // restore fields
+        if (sObjectBase != null) {
+            // restore the Id if it was cleared
+            if (sObjectId != null) {
+                sObjectBase.setId(sObjectId);
+            }
+            // restore the external id if it was cleared
+            if (sObjectExtIdName != null && oldValue != null) {
+                try {
+                    setPropertyValue(sObjectBase, sObjectExtIdName, oldValue);
+                } catch (SalesforceException e) {
+                    // YES, the exchange may fail if the property cannot be reset!!!
+                    exchange.setException(e);
+                }
+            }
+        }
+    }
+
+    private void setPropertyValue(AbstractSObjectBase sObjectBase, String name, Object value) throws SalesforceException {
+        try {
+            // set the value with the set method
+            Method setMethod = sObjectBase.getClass().getMethod("set" + name, value.getClass());
+            setMethod.invoke(sObjectBase, value);
+        } catch (NoSuchMethodException e) {
+            throw new SalesforceException(
+                String.format("SObject %s does not have a field %s",
+                    sObjectBase.getClass().getName(), name),
+                e);
+        } catch (InvocationTargetException e) {
+            throw new SalesforceException(
+                String.format("Error setting value %s.%s",
+                    sObjectBase.getClass().getSimpleName(), name),
+                e);
+        } catch (IllegalAccessException e) {
+            throw new SalesforceException(
+                String.format("Error accessing value %s.%s",
+                    sObjectBase.getClass().getSimpleName(), name),
+                e);
+        }
+    }
+
+    private Object getAndClearPropertyValue(AbstractSObjectBase sObjectBase, String propertyName) throws SalesforceException {
+        try {
+            // obtain the value using the get method
+            Method getMethod = sObjectBase.getClass().getMethod("get" + propertyName);
+            Object value = getMethod.invoke(sObjectBase);
+
+            // clear the value with the set method
+            Method setMethod = sObjectBase.getClass().getMethod("set" + propertyName, getMethod.getReturnType());
+            setMethod.invoke(sObjectBase, new Object[] { null });
+
+            return value;
+        } catch (NoSuchMethodException e) {
+            throw new SalesforceException(
+                String.format("SObject %s does not have a field %s",
+                    sObjectBase.getClass().getSimpleName(), propertyName),
+                e);
+        } catch (InvocationTargetException e) {
+            throw new SalesforceException(
+                String.format("Error getting/setting value %s.%s",
+                    sObjectBase.getClass().getSimpleName(), propertyName),
+                e);
+        } catch (IllegalAccessException e) {
+            throw new SalesforceException(
+                String.format("Error accessing value %s.%s",
+                    sObjectBase.getClass().getSimpleName(), propertyName),
+                e);
+        }
+    }
+
+    // pre-process request message
+    protected abstract void processRequest(Exchange exchange) throws SalesforceException;
+
+    // get request stream from In message
+    protected abstract InputStream getRequestStream(Exchange exchange) throws SalesforceException;
+
+    private void setResponseClass(Exchange exchange, String sObjectName) throws SalesforceException {
+        Class<?> sObjectClass;
+
+        if (sObjectName != null) {
+            // lookup class from class map
+            sObjectClass = classMap.get(sObjectName);
+            if (null == sObjectClass) {
+                throw new SalesforceException(String.format("No class found for SObject %s", sObjectName), null);
+            }
+
+        } else {
+
+            // use custom response class property
+            final String className = getParameter(SOBJECT_CLASS, exchange, IGNORE_BODY, NOT_OPTIONAL);
+            try {
+                sObjectClass = endpoint.getComponent().getCamelContext()
+                    .getClassResolver().resolveMandatoryClass(className);
+            } catch (ClassNotFoundException e) {
+                throw new SalesforceException(
+                    String.format("SObject class not found %s, %s",
+                        className, e.getMessage()),
+                    e);
+            }
+        }
+        exchange.setProperty(RESPONSE_CLASS, sObjectClass);
+    }
+
+    // process response entity and set out message in exchange
+    protected abstract void processResponse(Exchange exchange, InputStream responseEntity, SalesforceException ex, AsyncCallback callback);
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractSalesforceProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractSalesforceProcessor.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractSalesforceProcessor.java
new file mode 100644
index 0000000..e784458
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractSalesforceProcessor.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.eclipse.jetty.client.HttpClient;
+import org.apache.camel.component.salesforce.SalesforceComponent;
+import org.apache.camel.component.salesforce.SalesforceEndpoint;
+import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.component.salesforce.internal.OperationName;
+import org.apache.camel.component.salesforce.internal.SalesforceSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public abstract class AbstractSalesforceProcessor implements SalesforceProcessor {
+
+    protected static final boolean NOT_OPTIONAL = false;
+    protected static final boolean IS_OPTIONAL = true;
+    protected static final boolean USE_BODY = true;
+    protected static final boolean IGNORE_BODY = false;
+    protected final Logger LOG = LoggerFactory.getLogger(this.getClass());
+
+    protected final SalesforceEndpoint endpoint;
+    protected final Map<String, String> endpointConfigMap;
+
+    protected final OperationName operationName;
+    protected final SalesforceSession session;
+    protected final HttpClient httpClient;
+
+    public AbstractSalesforceProcessor(SalesforceEndpoint endpoint) {
+        this.endpoint = endpoint;
+        this.operationName = endpoint.getOperationName();
+        this.endpointConfigMap = endpoint.getConfiguration().toValueMap();
+
+        final SalesforceComponent component = endpoint.getComponent();
+        this.session = component.getSession();
+        this.httpClient = endpoint.getConfiguration().getHttpClient();
+    }
+
+    @Override
+    public abstract boolean process(Exchange exchange, AsyncCallback callback);
+
+    /**
+     * Gets value for a parameter from header, endpoint config, or exchange body (optional).
+     *
+     * @param exchange exchange to inspect
+     * @param convertInBody converts In body to String value if true
+     * @param propName name of property
+     * @param optional if {@code true} returns null, otherwise throws RestException
+     * @return value of property, or {@code null} for optional parameters if not found.
+     * @throws org.apache.camel.component.salesforce.api.SalesforceException if the property can't be found.
+     */
+    protected final String getParameter(String propName, Exchange exchange, boolean convertInBody, boolean optional) throws SalesforceException {
+        String propValue = exchange.getIn().getHeader(propName, String.class);
+        propValue = propValue == null ? endpointConfigMap.get(propName) : propValue;
+        propValue = (propValue == null && convertInBody) ? exchange.getIn().getBody(String.class) : propValue;
+
+        // error if property was not set
+        if (propValue == null && !optional) {
+            String msg = "Missing property " + propName;
+            throw new SalesforceException(msg, null);
+        }
+
+        return propValue;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/BulkApiProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/BulkApiProcessor.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/BulkApiProcessor.java
new file mode 100644
index 0000000..6e070f7
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/BulkApiProcessor.java
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.processor;
+
+import org.apache.camel.*;
+import org.apache.camel.converter.stream.StreamCacheConverter;
+import org.apache.camel.util.ServiceHelper;
+import org.apache.camel.component.salesforce.SalesforceEndpoint;
+import org.apache.camel.component.salesforce.SalesforceEndpointConfig;
+import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.component.salesforce.api.dto.bulk.*;
+import org.apache.camel.component.salesforce.internal.client.BulkApiClient;
+import org.apache.camel.component.salesforce.internal.client.DefaultBulkApiClient;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import static org.apache.camel.component.salesforce.SalesforceEndpointConfig.*;
+
+public class BulkApiProcessor extends AbstractSalesforceProcessor {
+
+    private BulkApiClient bulkClient;
+
+    public BulkApiProcessor(SalesforceEndpoint endpoint) throws SalesforceException {
+        super(endpoint);
+
+        this.bulkClient = new DefaultBulkApiClient(
+            endpointConfigMap.get(SalesforceEndpointConfig.API_VERSION), session, httpClient);
+    }
+
+    @Override
+    public boolean process(final Exchange exchange, final AsyncCallback callback) {
+
+        boolean done = false;
+        try {
+            switch (operationName) {
+                case CREATE_JOB:
+                    JobInfo jobBody = exchange.getIn().getMandatoryBody(JobInfo.class);
+                    bulkClient.createJob(jobBody, new BulkApiClient.JobInfoResponseCallback() {
+                        @Override
+                        public void onResponse(JobInfo jobInfo, SalesforceException ex) {
+                            processResponse(exchange, jobInfo, ex, callback);
+                        }
+                    });
+
+                    break;
+
+                case GET_JOB:
+                    jobBody = exchange.getIn().getBody(JobInfo.class);
+                    String jobId;
+                    if (jobBody != null) {
+                        jobId = jobBody.getId();
+                    } else {
+                        jobId = getParameter(JOB_ID, exchange, USE_BODY, NOT_OPTIONAL);
+                    }
+                    bulkClient.getJob(jobId, new BulkApiClient.JobInfoResponseCallback() {
+                        @Override
+                        public void onResponse(JobInfo jobInfo, SalesforceException ex) {
+                            processResponse(exchange, jobInfo, ex, callback);
+                        }
+                    });
+
+                    break;
+
+                case CLOSE_JOB:
+                    jobBody = exchange.getIn().getBody(JobInfo.class);
+                    if (jobBody != null) {
+                        jobId = jobBody.getId();
+                    } else {
+                        jobId = getParameter(JOB_ID, exchange, USE_BODY, NOT_OPTIONAL);
+                    }
+                    bulkClient.closeJob(jobId, new BulkApiClient.JobInfoResponseCallback() {
+                        @Override
+                        public void onResponse(JobInfo jobInfo, SalesforceException ex) {
+                            processResponse(exchange, jobInfo, ex, callback);
+                        }
+                    });
+
+                    break;
+
+                case ABORT_JOB:
+                    jobBody = exchange.getIn().getBody(JobInfo.class);
+                    if (jobBody != null) {
+                        jobId = jobBody.getId();
+                    } else {
+                        jobId = getParameter(JOB_ID, exchange, USE_BODY, NOT_OPTIONAL);
+                    }
+                    bulkClient.abortJob(jobId, new BulkApiClient.JobInfoResponseCallback() {
+                        @Override
+                        public void onResponse(JobInfo jobInfo, SalesforceException ex) {
+                            processResponse(exchange, jobInfo, ex, callback);
+                        }
+                    });
+
+                    break;
+
+                case CREATE_BATCH:
+                    // since request is in the body, use headers or endpoint params
+                    ContentType contentType = ContentType.fromValue(
+                        getParameter(CONTENT_TYPE, exchange, IGNORE_BODY, NOT_OPTIONAL));
+                    jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+
+                    InputStream request;
+                    try {
+                        request = exchange.getIn().getMandatoryBody(InputStream.class);
+                    } catch (CamelException e) {
+                        String msg = "Error preparing batch request: " + e.getMessage();
+                        throw new SalesforceException(msg, e);
+                    }
+
+                    bulkClient.createBatch(request, jobId, contentType, new BulkApiClient.BatchInfoResponseCallback() {
+                        @Override
+                        public void onResponse(BatchInfo batchInfo, SalesforceException ex) {
+                            processResponse(exchange, batchInfo, ex, callback);
+                        }
+                    });
+
+                    break;
+
+                case GET_BATCH:
+                    BatchInfo batchBody = exchange.getIn().getBody(BatchInfo.class);
+                    String batchId;
+                    if (batchBody != null) {
+                        jobId = batchBody.getJobId();
+                        batchId = batchBody.getId();
+                    } else {
+                        jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+                        batchId = getParameter(BATCH_ID, exchange, USE_BODY, NOT_OPTIONAL);
+                    }
+                    bulkClient.getBatch(jobId, batchId, new BulkApiClient.BatchInfoResponseCallback() {
+                        @Override
+                        public void onResponse(BatchInfo batchInfo, SalesforceException ex) {
+                            processResponse(exchange, batchInfo, ex, callback);
+                        }
+                    });
+
+                    break;
+
+                case GET_ALL_BATCHES:
+                    jobBody = exchange.getIn().getBody(JobInfo.class);
+                    if (jobBody != null) {
+                        jobId = jobBody.getId();
+                    } else {
+                        jobId = getParameter(JOB_ID, exchange, USE_BODY, NOT_OPTIONAL);
+                    }
+                    bulkClient.getAllBatches(jobId, new BulkApiClient.BatchInfoListResponseCallback() {
+                        @Override
+                        public void onResponse(List<BatchInfo> batchInfoList, SalesforceException ex) {
+                            processResponse(exchange, batchInfoList, ex, callback);
+                        }
+                    });
+
+                    break;
+
+                case GET_REQUEST:
+                    batchBody = exchange.getIn().getBody(BatchInfo.class);
+                    if (batchBody != null) {
+                        jobId = batchBody.getJobId();
+                        batchId = batchBody.getId();
+                    } else {
+                        jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+                        batchId = getParameter(BATCH_ID, exchange, USE_BODY, NOT_OPTIONAL);
+                    }
+
+                    bulkClient.getRequest(jobId, batchId, new BulkApiClient.StreamResponseCallback() {
+                        @Override
+                        public void onResponse(InputStream inputStream, SalesforceException ex) {
+                            // read the request stream into a StreamCache temp file
+                            // ensures the connection is read
+                            StreamCache body = null;
+                            if (inputStream != null) {
+                                try {
+                                    body = StreamCacheConverter.convertToStreamCache(inputStream, exchange);
+                                } catch (IOException e) {
+                                    String msg = "Error retrieving batch request: " + e.getMessage();
+                                    ex = new SalesforceException(msg, e);
+                                } finally {
+                                    // close the input stream to release the Http connection
+                                    try {
+                                        inputStream.close();
+                                    } catch (IOException ignore) {
+                                    }
+                                }
+                            }
+                            processResponse(exchange, body, ex, callback);
+                        }
+                    });
+
+                    break;
+
+                case GET_RESULTS:
+                    batchBody = exchange.getIn().getBody(BatchInfo.class);
+                    if (batchBody != null) {
+                        jobId = batchBody.getJobId();
+                        batchId = batchBody.getId();
+                    } else {
+                        jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+                        batchId = getParameter(BATCH_ID, exchange, USE_BODY, NOT_OPTIONAL);
+                    }
+                    bulkClient.getResults(jobId, batchId, new BulkApiClient.StreamResponseCallback() {
+                        @Override
+                        public void onResponse(InputStream inputStream, SalesforceException ex) {
+                            // read the result stream into a StreamCache temp file
+                            // ensures the connection is read
+                            StreamCache body = null;
+                            if (inputStream != null) {
+                                try {
+                                    body = StreamCacheConverter.convertToStreamCache(inputStream, exchange);
+                                } catch (IOException e) {
+                                    String msg = "Error retrieving batch results: " + e.getMessage();
+                                    ex = new SalesforceException(msg, e);
+                                } finally {
+                                    // close the input stream to release the Http connection
+                                    try {
+                                        inputStream.close();
+                                    } catch (IOException ignore) {
+                                    }
+                                }
+                            }
+                            processResponse(exchange, body, ex, callback);
+                        }
+                    });
+
+                    break;
+
+                case CREATE_BATCH_QUERY:
+                    jobBody = exchange.getIn().getBody(JobInfo.class);
+                    String soqlQuery;
+                    if (jobBody != null) {
+                        jobId = jobBody.getId();
+                        contentType = jobBody.getContentType();
+                        // use SOQL query from header or endpoint config
+                        soqlQuery = getParameter(SOBJECT_QUERY, exchange, IGNORE_BODY, NOT_OPTIONAL);
+                    } else {
+                        jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+                        contentType = ContentType.fromValue(
+                            getParameter(CONTENT_TYPE, exchange, IGNORE_BODY, NOT_OPTIONAL));
+                        // reuse SOBJECT_QUERY property
+                        soqlQuery = getParameter(SOBJECT_QUERY, exchange, USE_BODY, NOT_OPTIONAL);
+                    }
+                    bulkClient.createBatchQuery(jobId, soqlQuery, contentType,
+                        new BulkApiClient.BatchInfoResponseCallback() {
+                        @Override
+                        public void onResponse(BatchInfo batchInfo, SalesforceException ex) {
+                            processResponse(exchange, batchInfo, ex, callback);
+                        }
+                    });
+
+                    break;
+
+                case GET_QUERY_RESULT_IDS:
+                    batchBody = exchange.getIn().getBody(BatchInfo.class);
+                    if (batchBody != null) {
+                        jobId = batchBody.getJobId();
+                        batchId = batchBody.getId();
+                    } else {
+                        jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+                        batchId = getParameter(BATCH_ID, exchange, USE_BODY, NOT_OPTIONAL);
+                    }
+                    bulkClient.getQueryResultIds(jobId, batchId, new BulkApiClient.QueryResultIdsCallback() {
+                        @Override
+                        public void onResponse(List<String> ids, SalesforceException ex) {
+                            processResponse(exchange, ids, ex, callback);
+                        }
+                    });
+
+                    break;
+
+                case GET_QUERY_RESULT:
+                    batchBody = exchange.getIn().getBody(BatchInfo.class);
+                    String resultId;
+                    if (batchBody != null) {
+                        jobId = batchBody.getJobId();
+                        batchId = batchBody.getId();
+                        resultId = getParameter(RESULT_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+                    } else {
+                        jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+                        batchId = getParameter(BATCH_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+                        resultId = getParameter(RESULT_ID, exchange, USE_BODY, NOT_OPTIONAL);
+                    }
+                    bulkClient.getQueryResult(jobId, batchId, resultId, new BulkApiClient.StreamResponseCallback() {
+                        @Override
+                        public void onResponse(InputStream inputStream, SalesforceException ex) {
+                            StreamCache body = null;
+                            if (inputStream != null) {
+                                // read the result stream into a StreamCache temp file
+                                // ensures the connection is read
+                                try {
+                                    body = StreamCacheConverter.convertToStreamCache(inputStream, exchange);
+                                } catch (IOException e) {
+                                    String msg = "Error retrieving query result: " + e.getMessage();
+                                    ex = new SalesforceException(msg, e);
+                                } finally {
+                                    // close the input stream to release the Http connection
+                                    try {
+                                        inputStream.close();
+                                    } catch (IOException e) {
+                                        // ignore
+                                    }
+                                }
+                            }
+                            processResponse(exchange, body, ex, callback);
+                        }
+                    });
+
+                    break;
+            }
+
+        } catch (SalesforceException e) {
+            exchange.setException(new SalesforceException(
+                String.format("Error processing %s: [%s] \"%s\"",
+                    operationName, e.getStatusCode(), e.getMessage()),
+                e));
+            callback.done(true);
+            done = true;
+        } catch (InvalidPayloadException e) {
+            exchange.setException(new SalesforceException(
+                String.format("Unexpected Error processing %s: \"%s\"",
+                    operationName, e.getMessage()),
+                e));
+            callback.done(true);
+            done = true;
+        } catch (RuntimeException e) {
+            exchange.setException(new SalesforceException(
+                String.format("Unexpected Error processing %s: \"%s\"",
+                    operationName, e.getMessage()),
+                e));
+            callback.done(true);
+            done = true;
+        }
+
+        // continue routing asynchronously if false
+        return done;
+    }
+
+    private void processResponse(Exchange exchange, Object body, SalesforceException ex, AsyncCallback callback) {
+        final Message out = exchange.getOut();
+        if (ex != null) {
+            exchange.setException(ex);
+        } else {
+            out.setBody(body);
+        }
+
+        // copy headers and attachments
+        out.getHeaders().putAll(exchange.getIn().getHeaders());
+        out.getAttachments().putAll(exchange.getIn().getAttachments());
+
+        // signal exchange completion
+        callback.done(false);
+    }
+
+    @Override
+    public void start() throws Exception {
+        ServiceHelper.startService(bulkClient);
+    }
+
+    @Override
+    public void stop() throws Exception {
+        // stop the client
+        ServiceHelper.stopService(bulkClient);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/JsonRestProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/JsonRestProcessor.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/JsonRestProcessor.java
new file mode 100644
index 0000000..3ac32c6
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/JsonRestProcessor.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.codehaus.jackson.type.TypeReference;
+import org.eclipse.jetty.util.StringUtil;
+import org.apache.camel.component.salesforce.SalesforceEndpoint;
+import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.component.salesforce.api.dto.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+public class JsonRestProcessor extends AbstractRestProcessor {
+
+    // it is ok to use a single thread safe ObjectMapper
+    private final ObjectMapper objectMapper;
+    private static final String RESPONSE_TYPE = JsonRestProcessor.class.getName() + ".responseType";
+
+    public JsonRestProcessor(SalesforceEndpoint endpoint) throws SalesforceException {
+        super(endpoint);
+
+        this.objectMapper = new ObjectMapper();
+        // enable date time support including Joda DateTime
+        this.objectMapper.configure(SerializationConfig.Feature.WRITE_DATES_AS_TIMESTAMPS, false);
+    }
+
+    @Override
+    protected void processRequest(Exchange exchange) {
+
+        switch (operationName) {
+            case GET_VERSIONS:
+                // handle in built response types
+                exchange.setProperty(RESPONSE_TYPE, new TypeReference<List<Version>>() {});
+                break;
+
+            case GET_RESOURCES:
+                // handle in built response types
+                exchange.setProperty(RESPONSE_CLASS, RestResources.class);
+                break;
+
+            case GET_GLOBAL_OBJECTS:
+                // handle in built response types
+                exchange.setProperty(RESPONSE_CLASS, GlobalObjects.class);
+                break;
+
+            case GET_BASIC_INFO:
+                // handle in built response types
+                exchange.setProperty(RESPONSE_CLASS, SObjectBasicInfo.class);
+                break;
+
+            case GET_DESCRIPTION:
+                // handle in built response types
+                exchange.setProperty(RESPONSE_CLASS, SObjectDescription.class);
+                break;
+
+            case CREATE_SOBJECT:
+                // handle known response type
+                exchange.setProperty(RESPONSE_CLASS, CreateSObjectResult.class);
+                break;
+
+            case UPSERT_SOBJECT:
+                // handle known response type
+                exchange.setProperty(RESPONSE_CLASS, CreateSObjectResult.class);
+                break;
+
+            case SEARCH:
+                // handle known response type
+                exchange.setProperty(RESPONSE_TYPE, new TypeReference<List<SearchResult>>() {});
+                break;
+
+        }
+    }
+
+    @Override
+    protected InputStream getRequestStream(Exchange exchange) throws SalesforceException {
+        try {
+            InputStream request;
+            Message in = exchange.getIn();
+            request = in.getBody(InputStream.class);
+            if (request == null) {
+                AbstractSObjectBase sObject = in.getBody(AbstractSObjectBase.class);
+                if (sObject != null) {
+                    // marshall the SObject
+                    ByteArrayOutputStream out = new ByteArrayOutputStream();
+                    objectMapper.writeValue(out, sObject);
+                    request = new ByteArrayInputStream(out.toByteArray());
+                } else {
+                    // if all else fails, get body as String
+                    final String body = in.getBody(String.class);
+                    if (null == body) {
+                        String msg = "Unsupported request message body " +
+                            (in.getBody() == null ? null : in.getBody().getClass());
+                        throw new SalesforceException(msg, null);
+                    } else {
+                        request = new ByteArrayInputStream(body.getBytes(StringUtil.__UTF8_CHARSET));
+                    }
+                }
+            }
+
+            return request;
+
+        } catch (IOException e) {
+            String msg = "Error marshaling request: " + e.getMessage();
+            throw new SalesforceException(msg, e);
+        }
+    }
+
+    @Override
+    protected void processResponse(Exchange exchange, InputStream responseEntity, SalesforceException ex, AsyncCallback callback) {
+
+        // process JSON response for TypeReference
+        try {
+            // do we need to un-marshal a response
+            if (responseEntity != null) {
+                Object response = null;
+                Class<?> responseClass = exchange.getProperty(RESPONSE_CLASS, Class.class);
+                if (responseClass != null) {
+                    response = objectMapper.readValue(responseEntity, responseClass);
+                } else {
+                    TypeReference<?> responseType = exchange.getProperty(RESPONSE_TYPE, TypeReference.class);
+                    if (responseType != null) {
+                        response = objectMapper.readValue(responseEntity, responseType);
+                    } else {
+                        // return the response as a stream, for getBlobField
+                        response = responseEntity;
+                    }
+                }
+                exchange.getOut().setBody(response);
+            } else {
+                exchange.setException(ex);
+            }
+            // copy headers and attachments
+            exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
+            exchange.getOut().getAttachments().putAll(exchange.getIn().getAttachments());
+        } catch (IOException e) {
+            String msg = "Error parsing JSON response: " + e.getMessage();
+            exchange.setException(new SalesforceException(msg, e));
+        } finally {
+            // cleanup temporary exchange headers
+            exchange.removeProperty(RESPONSE_CLASS);
+            exchange.removeProperty(RESPONSE_TYPE);
+
+            // consume response entity
+            try {
+                if (responseEntity != null) {
+                    responseEntity.close();
+                }
+            } catch (IOException ignored) {
+            }
+
+            // notify callback that exchange is done
+            callback.done(false);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/SalesforceProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/SalesforceProcessor.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/SalesforceProcessor.java
new file mode 100644
index 0000000..36f77f6
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/SalesforceProcessor.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Service;
+
+public interface SalesforceProcessor extends Service {
+
+    boolean process(Exchange exchange, AsyncCallback callback);
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/XmlRestProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/XmlRestProcessor.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/XmlRestProcessor.java
new file mode 100644
index 0000000..75449b6
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/XmlRestProcessor.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.processor;
+
+import com.thoughtworks.xstream.XStream;
+import com.thoughtworks.xstream.XStreamException;
+import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
+import com.thoughtworks.xstream.io.naming.NoNameCoder;
+import com.thoughtworks.xstream.io.xml.CompactWriter;
+import com.thoughtworks.xstream.io.xml.XppDriver;
+import com.thoughtworks.xstream.mapper.CachingMapper;
+import com.thoughtworks.xstream.mapper.CannotResolveClassException;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.eclipse.jetty.util.StringUtil;
+import org.apache.camel.component.salesforce.SalesforceEndpoint;
+import org.apache.camel.component.salesforce.api.JodaTimeConverter;
+import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.component.salesforce.api.dto.*;
+
+import java.io.*;
+
+import static org.apache.camel.component.salesforce.SalesforceEndpointConfig.SOBJECT_NAME;
+
+public class XmlRestProcessor extends AbstractRestProcessor {
+
+    // although XStream is generally thread safe, because of the way we use aliases
+    // for GET_BASIC_INFO and GET_DESCRIPTION, we need to use a ThreadLocal
+    // not very efficient when both JSON and XML are used together with a single Thread pool
+    // but this will do for now
+    private static ThreadLocal<XStream> xStream =
+        new ThreadLocal<XStream>() {
+            @Override
+            protected XStream initialValue() {
+                // use NoNameCoder to avoid escaping __ in custom field names
+                // and CompactWriter to avoid pretty printing
+                XStream result = new XStream(new XppDriver(new NoNameCoder()) {
+                    @Override
+                    public HierarchicalStreamWriter createWriter(Writer out) {
+                        return new CompactWriter(out, getNameCoder());
+                    }
+
+                });
+                result.registerConverter(new JodaTimeConverter());
+                return result;
+            }
+        };
+
+    private static final String RESPONSE_ALIAS = XmlRestProcessor.class.getName() + ".responseAlias";
+
+    public XmlRestProcessor(SalesforceEndpoint endpoint) throws SalesforceException {
+        super(endpoint);
+
+    }
+
+    @Override
+    protected void processRequest(Exchange exchange) throws SalesforceException {
+
+        switch (operationName) {
+            case GET_VERSIONS:
+                exchange.setProperty(RESPONSE_CLASS, Versions.class);
+                break;
+
+            case GET_RESOURCES:
+                exchange.setProperty(RESPONSE_CLASS, RestResources.class);
+                break;
+
+            case GET_GLOBAL_OBJECTS:
+                // handle in built response types
+                exchange.setProperty(RESPONSE_CLASS, GlobalObjects.class);
+                break;
+
+            case GET_BASIC_INFO:
+                // handle in built response types
+                exchange.setProperty(RESPONSE_CLASS, SObjectBasicInfo.class);
+
+                // need to add alias for Salesforce XML that uses SObject name as root element
+                exchange.setProperty(RESPONSE_ALIAS,
+                    getParameter(SOBJECT_NAME, exchange, USE_BODY, NOT_OPTIONAL));
+                break;
+
+            case GET_DESCRIPTION:
+                // handle in built response types
+                exchange.setProperty(RESPONSE_CLASS, SObjectDescription.class);
+
+                // need to add alias for Salesforce XML that uses SObject name as root element
+                exchange.setProperty(RESPONSE_ALIAS,
+                    getParameter(SOBJECT_NAME, exchange, USE_BODY, NOT_OPTIONAL));
+                break;
+
+            case GET_SOBJECT:
+                // need to add alias for Salesforce XML that uses SObject name as root element
+                exchange.setProperty(RESPONSE_ALIAS,
+                    getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL));
+                break;
+
+            case CREATE_SOBJECT:
+                // handle known response type
+                exchange.setProperty(RESPONSE_CLASS, CreateSObjectResult.class);
+                break;
+
+            case GET_SOBJECT_WITH_ID:
+                // need to add alias for Salesforce XML that uses SObject name as root element
+                exchange.setProperty(RESPONSE_ALIAS,
+                    getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL));
+                break;
+
+            case UPSERT_SOBJECT:
+                // handle known response type
+                exchange.setProperty(RESPONSE_CLASS, CreateSObjectResult.class);
+                break;
+
+            case QUERY:
+            case QUERY_MORE:
+                // need to add alias for Salesforce XML that uses SObject name as root element
+                exchange.setProperty(RESPONSE_ALIAS,
+                    "QueryResult");
+                break;
+
+            case SEARCH:
+                // handle known response type
+                exchange.setProperty(RESPONSE_CLASS, SearchResults.class);
+                break;
+
+        }
+
+    }
+
+    protected InputStream getRequestStream(Exchange exchange) throws SalesforceException {
+        final XStream localXStream = xStream.get();
+        try {
+            // get request stream from In message
+            Message in = exchange.getIn();
+            InputStream request = in.getBody(InputStream.class);
+            if (request == null) {
+                AbstractSObjectBase sObject = in.getBody(AbstractSObjectBase.class);
+                if (sObject != null) {
+                    // marshall the SObject
+                    // first process annotations on the class, for things like alias, etc.
+                    localXStream.processAnnotations(sObject.getClass());
+                    ByteArrayOutputStream out = new ByteArrayOutputStream();
+                    // make sure we write the XML with the right encoding
+                    localXStream.toXML(sObject, new OutputStreamWriter(out, StringUtil.__UTF8_CHARSET));
+                    request = new ByteArrayInputStream(out.toByteArray());
+                } else {
+                    // if all else fails, get body as String
+                    final String body = in.getBody(String.class);
+                    if (null == body) {
+                        String msg = "Unsupported request message body " +
+                            (in.getBody() == null ? null : in.getBody().getClass());
+                        throw new SalesforceException(msg, null);
+                    } else {
+                        request = new ByteArrayInputStream(body.getBytes(StringUtil.__UTF8_CHARSET));
+                    }
+                }
+            }
+            return request;
+        } catch (XStreamException e) {
+            String msg = "Error marshaling request: " + e.getMessage();
+            throw new SalesforceException(msg, e);
+        }
+    }
+
+    @Override
+    protected void processResponse(Exchange exchange, InputStream responseEntity,
+                                   SalesforceException exception, AsyncCallback callback) {
+        final XStream localXStream = xStream.get();
+        try {
+            // do we need to un-marshal a response
+            if (responseEntity != null) {
+                final Class<?> responseClass = exchange.getProperty(RESPONSE_CLASS, Class.class);
+                Object response;
+                if (responseClass != null) {
+                    // its ok to call this multiple times, as xstream ignores duplicate calls
+                    localXStream.processAnnotations(responseClass);
+                    final String responseAlias = exchange.getProperty(RESPONSE_ALIAS, String.class);
+                    if (responseAlias != null) {
+                        // extremely dirty, need to flush entire cache if its holding on to an old alias!!!
+                        final CachingMapper mapper = (CachingMapper) localXStream.getMapper();
+                        try {
+                            if (mapper.realClass(responseAlias) != responseClass) {
+                                mapper.flushCache();
+                            }
+                        } catch (CannotResolveClassException ignore) {
+                        }
+                        localXStream.alias(responseAlias, responseClass);
+                    }
+                    response = responseClass.newInstance();
+                    localXStream.fromXML(responseEntity, response);
+                } else {
+                    // return the response as a stream, for getBlobField
+                    response = responseEntity;
+                }
+                exchange.getOut().setBody(response);
+            } else {
+                exchange.setException(exception);
+            }
+            // copy headers and attachments
+            exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
+            exchange.getOut().getAttachments().putAll(exchange.getIn().getAttachments());
+        } catch (XStreamException e) {
+            String msg = "Error parsing XML response: " + e.getMessage();
+            exchange.setException(new SalesforceException(msg, e));
+        } catch (Exception e) {
+            String msg = "Error creating XML response: " + e.getMessage();
+            exchange.setException(new SalesforceException(msg, e));
+        } finally {
+            // cleanup temporary exchange headers
+            exchange.removeProperty(RESPONSE_CLASS);
+            exchange.removeProperty(RESPONSE_ALIAS);
+
+            // consume response entity
+            if (responseEntity != null) {
+                try {
+                    responseEntity.close();
+                } catch (IOException ignored) {
+                }
+            }
+
+            // notify callback that exchange is done
+            callback.done(false);
+        }
+    }
+
+}