You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cm...@apache.org on 2013/08/25 18:49:44 UTC

git commit: fixed CS error - only extracted some mothods to reduce the complexity and lenght of mothods

Updated Branches:
  refs/heads/master 2f0970a67 -> 3f8fb8dc9


fixed CS error - only extracted some mothods to reduce the complexity and lenght of mothods


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3f8fb8dc
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3f8fb8dc
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3f8fb8dc

Branch: refs/heads/master
Commit: 3f8fb8dc9a961fbc772d21af42470269d802767b
Parents: 2f0970a
Author: cmueller <cm...@apache.org>
Authored: Sun Aug 25 18:49:31 2013 +0200
Committer: cmueller <cm...@apache.org>
Committed: Sun Aug 25 18:49:31 2013 +0200

----------------------------------------------------------------------
 .../processor/AbstractRestProcessor.java        | 587 ++++++++++---------
 .../internal/processor/BulkApiProcessor.java    | 543 +++++++++--------
 2 files changed, 609 insertions(+), 521 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3f8fb8dc/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractRestProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractRestProcessor.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractRestProcessor.java
index 9869caf..661a896 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractRestProcessor.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractRestProcessor.java
@@ -78,346 +78,387 @@ public abstract class AbstractRestProcessor extends AbstractSalesforceProcessor
 
         // call Salesforce asynchronously
         try {
-
             // call Operation using REST client
             switch (operationName) {
             case GET_VERSIONS:
-                restClient.getVersions(new RestClient.ResponseCallback() {
-                    @Override
-                    public void onResponse(InputStream response, SalesforceException exception) {
-                        // process response entity and create out message
-                        processResponse(exchange, response, exception, callback);
-                    }
-                });
+                processGetVersions(exchange, callback);
                 break;
-
             case GET_RESOURCES:
-                restClient.getResources(new RestClient.ResponseCallback() {
-                    @Override
-                    public void onResponse(InputStream response, SalesforceException exception) {
-                        processResponse(exchange, response, exception, callback);
-                    }
-                });
+                processGetResources(exchange, callback);
                 break;
-
             case GET_GLOBAL_OBJECTS:
-                restClient.getGlobalObjects(new RestClient.ResponseCallback() {
-                    @Override
-                    public void onResponse(InputStream response, SalesforceException exception) {
-                        processResponse(exchange, response, exception, callback);
-                    }
-                });
+                processGetGlobalObjects(exchange, callback);
                 break;
-
             case GET_BASIC_INFO:
-                String sObjectName = getParameter(SOBJECT_NAME, exchange, USE_BODY, NOT_OPTIONAL);
-                restClient.getBasicInfo(sObjectName, new RestClient.ResponseCallback() {
-                    @Override
-                    public void onResponse(InputStream response, SalesforceException exception) {
-                        processResponse(exchange, response, exception, callback);
-                    }
-                });
+                processGetBasicInfo(exchange, callback);
                 break;
-
             case GET_DESCRIPTION:
-                sObjectName = getParameter(SOBJECT_NAME, exchange, USE_BODY, NOT_OPTIONAL);
-                restClient.getDescription(sObjectName, new RestClient.ResponseCallback() {
-                    @Override
-                    public void onResponse(InputStream response, SalesforceException exception) {
-                        processResponse(exchange, response, exception, callback);
-                    }
-                });
+                processGetDescription(exchange, callback);
                 break;
+            case GET_SOBJECT:
+                processGetSobject(exchange, callback);
+                break;
+            case CREATE_SOBJECT:
+                processCreateSobject(exchange, callback);
+                break;
+            case UPDATE_SOBJECT:
+                processUpdateSobject(exchange, callback);
+                break;
+            case DELETE_SOBJECT:
+                processDeleteSobject(exchange, callback);
+                break;
+            case GET_SOBJECT_WITH_ID:
+                processGetSobjectWithId(exchange, callback);
+                break;
+            case UPSERT_SOBJECT:
+                processUpsertSobject(exchange, callback);
+                break;
+            case DELETE_SOBJECT_WITH_ID:
+                processDeleteSobjectWithId(exchange, callback);
+                break;
+            case GET_BLOB_FIELD:
+                processGetBlobField(exchange, callback);
+                break;
+            case QUERY:
+                processQuery(exchange, callback);
+                break;
+            case QUERY_MORE:
+                processQueryMore(exchange, callback);
+                break;
+            case SEARCH:
+                processSearch(exchange, callback);
+                break;
+            default:
+                throw new SalesforceException("Unknow operation name: " + operationName, null);
+            }
+        } catch (SalesforceException e) {
+            exchange.setException(new SalesforceException(
+                    String.format("Error processing %s: [%s] \"%s\"",
+                            operationName, e.getStatusCode(), e.getMessage()),
+                    e));
+            callback.done(true);
+            return true;
+        } catch (RuntimeException e) {
+            exchange.setException(new SalesforceException(
+                    String.format("Unexpected Error processing %s: \"%s\"",
+                            operationName, e.getMessage()),
+                    e));
+            callback.done(true);
+            return true;
+        }
 
-            case GET_SOBJECT: {
-                String sObjectIdValue;
-                // determine parameters from input AbstractSObject
-                final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
-                if (sObjectBase != null) {
-                    sObjectName = sObjectBase.getClass().getSimpleName();
-                    sObjectIdValue = sObjectBase.getId();
-                } else {
-                    sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
-                    sObjectIdValue = getParameter(SOBJECT_ID, exchange, USE_BODY, NOT_OPTIONAL);
-                }
-                final String sObjectId = sObjectIdValue;
-
-                // use sObject name to load class
-                setResponseClass(exchange, sObjectName);
-
-                // get optional field list
-                String fieldsValue = getParameter(SOBJECT_FIELDS, exchange, IGNORE_BODY, IS_OPTIONAL);
-                String[] fields = null;
-                if (fieldsValue != null) {
-                    fields = fieldsValue.split(",");
-                }
+        // continue routing asynchronously
+        return false;
+    }
 
-                restClient.getSObject(sObjectName, sObjectId, fields, new RestClient.ResponseCallback() {
-                    @Override
-                    public void onResponse(InputStream response, SalesforceException exception) {
-                        processResponse(exchange, response, exception, callback);
-                        restoreFields(exchange, sObjectBase, sObjectId, null, null);
-                    }
-                });
+    private void processGetVersions(final Exchange exchange, final AsyncCallback callback) {
+        restClient.getVersions(new RestClient.ResponseCallback() {
+            @Override
+            public void onResponse(InputStream response, SalesforceException exception) {
+                // process response entity and create out message
+                processResponse(exchange, response, exception, callback);
+            }
+        });
+    }
 
-                break;
+    private void processGetResources(final Exchange exchange, final AsyncCallback callback) {
+        restClient.getResources(new RestClient.ResponseCallback() {
+            @Override
+            public void onResponse(InputStream response, SalesforceException exception) {
+                processResponse(exchange, response, exception, callback);
             }
+        });
+    }
 
-            case CREATE_SOBJECT: {
-                // determine parameters from input AbstractSObject
-                AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
-                if (sObjectBase != null) {
-                    sObjectName = sObjectBase.getClass().getSimpleName();
-                } else {
-                    sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
-                }
+    private void processGetGlobalObjects(final Exchange exchange, final AsyncCallback callback) {
+        restClient.getGlobalObjects(new RestClient.ResponseCallback() {
+            @Override
+            public void onResponse(InputStream response, SalesforceException exception) {
+                processResponse(exchange, response, exception, callback);
+            }
+        });
+    }
 
-                restClient.createSObject(sObjectName, getRequestStream(exchange),
-                        new RestClient.ResponseCallback() {
-                            @Override
-                            public void onResponse(InputStream response, SalesforceException exception) {
-                                processResponse(exchange, response, exception, callback);
-                            }
-                        });
+    private void processGetBasicInfo(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+        String sObjectName = getParameter(SOBJECT_NAME, exchange, USE_BODY, NOT_OPTIONAL);
+        restClient.getBasicInfo(sObjectName, new RestClient.ResponseCallback() {
+            @Override
+            public void onResponse(InputStream response, SalesforceException exception) {
+                processResponse(exchange, response, exception, callback);
+            }
+        });
+    }
 
-                break;
+    private void processGetDescription(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+        String sObjectName;
+        sObjectName = getParameter(SOBJECT_NAME, exchange, USE_BODY, NOT_OPTIONAL);
+        restClient.getDescription(sObjectName, new RestClient.ResponseCallback() {
+            @Override
+            public void onResponse(InputStream response, SalesforceException exception) {
+                processResponse(exchange, response, exception, callback);
             }
+        });
+    }
 
-            case UPDATE_SOBJECT: {
-                // determine parameters from input AbstractSObject
-                final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
-                String sObjectId;
-                if (sObjectBase != null) {
-                    sObjectName = sObjectBase.getClass().getSimpleName();
-                    // remember the sObject Id
-                    sObjectId = sObjectBase.getId();
-                    // clear base object fields, which cannot be updated
-                    sObjectBase.clearBaseFields();
-                } else {
-                    sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
-                    sObjectId = getParameter(SOBJECT_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
-                }
+    private void processGetSobject(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+        String sObjectName;
+        String sObjectIdValue;
+        // determine parameters from input AbstractSObject
+        final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+        if (sObjectBase != null) {
+            sObjectName = sObjectBase.getClass().getSimpleName();
+            sObjectIdValue = sObjectBase.getId();
+        } else {
+            sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+            sObjectIdValue = getParameter(SOBJECT_ID, exchange, USE_BODY, NOT_OPTIONAL);
+        }
+        final String sObjectId = sObjectIdValue;
 
-                final String finalsObjectId = sObjectId;
-                restClient.updateSObject(sObjectName, sObjectId, getRequestStream(exchange),
-                        new RestClient.ResponseCallback() {
-                            @Override
-                            public void onResponse(InputStream response, SalesforceException exception) {
-                                processResponse(exchange, response, exception, callback);
-                                restoreFields(exchange, sObjectBase, finalsObjectId, null, null);
-                            }
-                        });
+        // use sObject name to load class
+        setResponseClass(exchange, sObjectName);
 
-                break;
+        // get optional field list
+        String fieldsValue = getParameter(SOBJECT_FIELDS, exchange, IGNORE_BODY, IS_OPTIONAL);
+        String[] fields = null;
+        if (fieldsValue != null) {
+            fields = fieldsValue.split(",");
+        }
+
+        restClient.getSObject(sObjectName, sObjectId, fields, new RestClient.ResponseCallback() {
+            @Override
+            public void onResponse(InputStream response, SalesforceException exception) {
+                processResponse(exchange, response, exception, callback);
+                restoreFields(exchange, sObjectBase, sObjectId, null, null);
             }
+        });
+    }
 
-            case DELETE_SOBJECT: {
-                // determine parameters from input AbstractSObject
-                final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
-                String sObjectIdValue;
-                if (sObjectBase != null) {
-                    sObjectName = sObjectBase.getClass().getSimpleName();
-                    sObjectIdValue = sObjectBase.getId();
-                } else {
-                    sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
-                    sObjectIdValue = getParameter(SOBJECT_ID, exchange, USE_BODY, NOT_OPTIONAL);
-                }
-                final String sObjectId = sObjectIdValue;
+    private void processCreateSobject(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+        String sObjectName;
+        // determine parameters from input AbstractSObject
+        AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+        if (sObjectBase != null) {
+            sObjectName = sObjectBase.getClass().getSimpleName();
+        } else {
+            sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+        }
 
-                restClient.deleteSObject(sObjectName, sObjectId, new RestClient.ResponseCallback() {
+        restClient.createSObject(sObjectName, getRequestStream(exchange),
+                new RestClient.ResponseCallback() {
                     @Override
                     public void onResponse(InputStream response, SalesforceException exception) {
                         processResponse(exchange, response, exception, callback);
-                        restoreFields(exchange, sObjectBase, sObjectId, null, null);
                     }
                 });
-                break;
-            }
-
-            case GET_SOBJECT_WITH_ID: {
-                Object oldValue = null;
-                String sObjectExtIdValue;
-                final String sObjectExtIdName = getParameter(SOBJECT_EXT_ID_NAME,
-                        exchange, IGNORE_BODY, NOT_OPTIONAL);
-
-                // determine parameters from input AbstractSObject
-                final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
-                if (sObjectBase != null) {
-                    sObjectName = sObjectBase.getClass().getSimpleName();
-                    oldValue = getAndClearPropertyValue(sObjectBase, sObjectExtIdName);
-                    sObjectExtIdValue = oldValue.toString();
-                } else {
-                    sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
-                    sObjectExtIdValue = getParameter(SOBJECT_EXT_ID_VALUE, exchange, USE_BODY, NOT_OPTIONAL);
-                }
-
-                // use sObject name to load class
-                setResponseClass(exchange, sObjectName);
-
-                final Object finalOldValue = oldValue;
-                restClient.getSObjectWithId(sObjectName, sObjectExtIdName, sObjectExtIdValue,
-                        new RestClient.ResponseCallback() {
-                            @Override
-                            public void onResponse(InputStream response, SalesforceException exception) {
-                                processResponse(exchange, response, exception, callback);
-                                restoreFields(exchange, sObjectBase, null, sObjectExtIdName, finalOldValue);
-                            }
-                        });
+    }
 
-                break;
-            }
+    private void processUpdateSobject(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+        String sObjectName;
+        // determine parameters from input AbstractSObject
+        final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+        String sObjectId;
+        if (sObjectBase != null) {
+            sObjectName = sObjectBase.getClass().getSimpleName();
+            // remember the sObject Id
+            sObjectId = sObjectBase.getId();
+            // clear base object fields, which cannot be updated
+            sObjectBase.clearBaseFields();
+        } else {
+            sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+            sObjectId = getParameter(SOBJECT_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+        }
 
-            case UPSERT_SOBJECT: {
-                String sObjectExtIdValue;
-                final String sObjectExtIdName = getParameter(SOBJECT_EXT_ID_NAME, exchange,
-                        IGNORE_BODY, NOT_OPTIONAL);
-
-                // determine parameters from input AbstractSObject
-                Object oldValue = null;
-                final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
-                if (sObjectBase != null) {
-                    sObjectName = sObjectBase.getClass().getSimpleName();
-                    oldValue = getAndClearPropertyValue(sObjectBase, sObjectExtIdName);
-                    sObjectExtIdValue = oldValue.toString();
-                    // clear base object fields, which cannot be updated
-                    sObjectBase.clearBaseFields();
-                } else {
-                    sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
-                    sObjectExtIdValue = getParameter(SOBJECT_EXT_ID_VALUE, exchange, IGNORE_BODY, NOT_OPTIONAL);
-                }
+        final String finalsObjectId = sObjectId;
+        restClient.updateSObject(sObjectName, sObjectId, getRequestStream(exchange),
+                new RestClient.ResponseCallback() {
+                    @Override
+                    public void onResponse(InputStream response, SalesforceException exception) {
+                        processResponse(exchange, response, exception, callback);
+                        restoreFields(exchange, sObjectBase, finalsObjectId, null, null);
+                    }
+                });
+    }
 
-                final Object finalOldValue = oldValue;
-                restClient.upsertSObject(sObjectName, sObjectExtIdName, sObjectExtIdValue, getRequestStream(exchange),
-                    new RestClient.ResponseCallback() {
-                            @Override
-                            public void onResponse(InputStream response, SalesforceException exception) {
-                                processResponse(exchange, response, exception, callback);
-                                restoreFields(exchange, sObjectBase, null, sObjectExtIdName, finalOldValue);
-                            }
-                        });
+    private void processDeleteSobject(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+        String sObjectName;
+        // determine parameters from input AbstractSObject
+        final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+        String sObjectIdValue;
+        if (sObjectBase != null) {
+            sObjectName = sObjectBase.getClass().getSimpleName();
+            sObjectIdValue = sObjectBase.getId();
+        } else {
+            sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+            sObjectIdValue = getParameter(SOBJECT_ID, exchange, USE_BODY, NOT_OPTIONAL);
+        }
+        final String sObjectId = sObjectIdValue;
 
-                break;
+        restClient.deleteSObject(sObjectName, sObjectId, new RestClient.ResponseCallback() {
+            @Override
+            public void onResponse(InputStream response, SalesforceException exception) {
+                processResponse(exchange, response, exception, callback);
+                restoreFields(exchange, sObjectBase, sObjectId, null, null);
             }
+        });
+    }
 
-            case DELETE_SOBJECT_WITH_ID: {
-                final String sObjectExtIdName = getParameter(SOBJECT_EXT_ID_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
-
-                // determine parameters from input AbstractSObject
-                Object oldValue = null;
-                final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
-                String sObjectExtIdValue;
-                if (sObjectBase != null) {
-                    sObjectName = sObjectBase.getClass().getSimpleName();
-                    oldValue = getAndClearPropertyValue(sObjectBase, sObjectExtIdName);
-                    sObjectExtIdValue = oldValue.toString();
-                } else {
-                    sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
-                    sObjectExtIdValue = getParameter(SOBJECT_EXT_ID_VALUE, exchange, USE_BODY, NOT_OPTIONAL);
-                }
+    private void processGetSobjectWithId(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+        String sObjectName;
+        Object oldValue = null;
+        String sObjectExtIdValue;
+        final String sObjectExtIdName = getParameter(SOBJECT_EXT_ID_NAME,
+                exchange, IGNORE_BODY, NOT_OPTIONAL);
 
-                final Object finalOldValue = oldValue;
-                restClient.deleteSObjectWithId(sObjectName, sObjectExtIdName, sObjectExtIdValue,
-                        new RestClient.ResponseCallback() {
-                            @Override
-                            public void onResponse(InputStream response, SalesforceException exception) {
-                                processResponse(exchange, response, exception, callback);
-                                restoreFields(exchange, sObjectBase, null, sObjectExtIdName, finalOldValue);
-                            }
-                        });
+        // determine parameters from input AbstractSObject
+        final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+        if (sObjectBase != null) {
+            sObjectName = sObjectBase.getClass().getSimpleName();
+            oldValue = getAndClearPropertyValue(sObjectBase, sObjectExtIdName);
+            sObjectExtIdValue = oldValue.toString();
+        } else {
+            sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+            sObjectExtIdValue = getParameter(SOBJECT_EXT_ID_VALUE, exchange, USE_BODY, NOT_OPTIONAL);
+        }
 
-                break;
-            }
+        // use sObject name to load class
+        setResponseClass(exchange, sObjectName);
 
-            case GET_BLOB_FIELD: {
-                // get blob field name
-                final String sObjectBlobFieldName = getParameter(SOBJECT_BLOB_FIELD_NAME,
-                        exchange, IGNORE_BODY, NOT_OPTIONAL);
-
-                // determine parameters from input AbstractSObject
-                final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
-                String sObjectIdValue;
-                if (sObjectBase != null) {
-                    sObjectName = sObjectBase.getClass().getSimpleName();
-                    sObjectIdValue = sObjectBase.getId();
-                } else {
-                    sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
-                    sObjectIdValue = getParameter(SOBJECT_ID, exchange, USE_BODY, NOT_OPTIONAL);
-                }
-                final String sObjectId = sObjectIdValue;
-
-                restClient.getBlobField(sObjectName, sObjectId, sObjectBlobFieldName,
-                        new RestClient.ResponseCallback() {
-                            @Override
-                            public void onResponse(InputStream response, SalesforceException exception) {
-                                processResponse(exchange, response, exception, callback);
-                                restoreFields(exchange, sObjectBase, sObjectId, null, null);
-                            }
-                        });
-                break;
-            }
+        final Object finalOldValue = oldValue;
+        restClient.getSObjectWithId(sObjectName, sObjectExtIdName, sObjectExtIdValue,
+                new RestClient.ResponseCallback() {
+                    @Override
+                    public void onResponse(InputStream response, SalesforceException exception) {
+                        processResponse(exchange, response, exception, callback);
+                        restoreFields(exchange, sObjectBase, null, sObjectExtIdName, finalOldValue);
+                    }
+                });
+    }
 
-            case QUERY:
-                final String sObjectQuery = getParameter(SOBJECT_QUERY, exchange, USE_BODY, NOT_OPTIONAL);
+    private void processUpsertSobject(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+        String sObjectName;
+        String sObjectExtIdValue;
+        final String sObjectExtIdName = getParameter(SOBJECT_EXT_ID_NAME, exchange,
+                IGNORE_BODY, NOT_OPTIONAL);
 
-                // use sObject name to load class
-                setResponseClass(exchange, null);
+        // determine parameters from input AbstractSObject
+        Object oldValue = null;
+        final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+        if (sObjectBase != null) {
+            sObjectName = sObjectBase.getClass().getSimpleName();
+            oldValue = getAndClearPropertyValue(sObjectBase, sObjectExtIdName);
+            sObjectExtIdValue = oldValue.toString();
+            // clear base object fields, which cannot be updated
+            sObjectBase.clearBaseFields();
+        } else {
+            sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+            sObjectExtIdValue = getParameter(SOBJECT_EXT_ID_VALUE, exchange, IGNORE_BODY, NOT_OPTIONAL);
+        }
 
-                restClient.query(sObjectQuery, new RestClient.ResponseCallback() {
+        final Object finalOldValue = oldValue;
+        restClient.upsertSObject(sObjectName, sObjectExtIdName, sObjectExtIdValue, getRequestStream(exchange),
+            new RestClient.ResponseCallback() {
                     @Override
                     public void onResponse(InputStream response, SalesforceException exception) {
                         processResponse(exchange, response, exception, callback);
+                        restoreFields(exchange, sObjectBase, null, sObjectExtIdName, finalOldValue);
                     }
                 });
-                break;
+    }
 
-            case QUERY_MORE:
-                // reuse SOBJECT_QUERY parameter name for nextRecordsUrl
-                final String nextRecordsUrl = getParameter(SOBJECT_QUERY, exchange, USE_BODY, NOT_OPTIONAL);
+    private void processDeleteSobjectWithId(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+        String sObjectName;
+        final String sObjectExtIdName = getParameter(SOBJECT_EXT_ID_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
 
-                // use custom response class property
-                setResponseClass(exchange, null);
+        // determine parameters from input AbstractSObject
+        Object oldValue = null;
+        final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+        String sObjectExtIdValue;
+        if (sObjectBase != null) {
+            sObjectName = sObjectBase.getClass().getSimpleName();
+            oldValue = getAndClearPropertyValue(sObjectBase, sObjectExtIdName);
+            sObjectExtIdValue = oldValue.toString();
+        } else {
+            sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+            sObjectExtIdValue = getParameter(SOBJECT_EXT_ID_VALUE, exchange, USE_BODY, NOT_OPTIONAL);
+        }
 
-                restClient.queryMore(nextRecordsUrl, new RestClient.ResponseCallback() {
+        final Object finalOldValue = oldValue;
+        restClient.deleteSObjectWithId(sObjectName, sObjectExtIdName, sObjectExtIdValue,
+                new RestClient.ResponseCallback() {
                     @Override
                     public void onResponse(InputStream response, SalesforceException exception) {
                         processResponse(exchange, response, exception, callback);
+                        restoreFields(exchange, sObjectBase, null, sObjectExtIdName, finalOldValue);
                     }
                 });
-                break;
+    }
 
-            case SEARCH:
-                final String sObjectSearch = getParameter(SOBJECT_SEARCH, exchange, USE_BODY, NOT_OPTIONAL);
+    private void processGetBlobField(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+        String sObjectName;
+        // get blob field name
+        final String sObjectBlobFieldName = getParameter(SOBJECT_BLOB_FIELD_NAME,
+                exchange, IGNORE_BODY, NOT_OPTIONAL);
 
-                restClient.search(sObjectSearch, new RestClient.ResponseCallback() {
+        // determine parameters from input AbstractSObject
+        final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+        String sObjectIdValue;
+        if (sObjectBase != null) {
+            sObjectName = sObjectBase.getClass().getSimpleName();
+            sObjectIdValue = sObjectBase.getId();
+        } else {
+            sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+            sObjectIdValue = getParameter(SOBJECT_ID, exchange, USE_BODY, NOT_OPTIONAL);
+        }
+        final String sObjectId = sObjectIdValue;
+
+        restClient.getBlobField(sObjectName, sObjectId, sObjectBlobFieldName,
+                new RestClient.ResponseCallback() {
                     @Override
                     public void onResponse(InputStream response, SalesforceException exception) {
                         processResponse(exchange, response, exception, callback);
+                        restoreFields(exchange, sObjectBase, sObjectId, null, null);
                     }
                 });
-                break;
+    }
 
-            default:
-                throw new SalesforceException("Unknow operation name: " + operationName, null);
+    private void processQuery(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+        final String sObjectQuery = getParameter(SOBJECT_QUERY, exchange, USE_BODY, NOT_OPTIONAL);
+
+        // use sObject name to load class
+        setResponseClass(exchange, null);
 
+        restClient.query(sObjectQuery, new RestClient.ResponseCallback() {
+            @Override
+            public void onResponse(InputStream response, SalesforceException exception) {
+                processResponse(exchange, response, exception, callback);
             }
-        } catch (SalesforceException e) {
-            exchange.setException(new SalesforceException(
-                    String.format("Error processing %s: [%s] \"%s\"",
-                            operationName, e.getStatusCode(), e.getMessage()),
-                    e));
-            callback.done(true);
-            return true;
-        } catch (RuntimeException e) {
-            exchange.setException(new SalesforceException(
-                    String.format("Unexpected Error processing %s: \"%s\"",
-                            operationName, e.getMessage()),
-                    e));
-            callback.done(true);
-            return true;
-        }
+        });
+    }
 
-        // continue routing asynchronously
-        return false;
+    private void processQueryMore(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+        // reuse SOBJECT_QUERY parameter name for nextRecordsUrl
+        final String nextRecordsUrl = getParameter(SOBJECT_QUERY, exchange, USE_BODY, NOT_OPTIONAL);
+
+        // use custom response class property
+        setResponseClass(exchange, null);
+
+        restClient.queryMore(nextRecordsUrl, new RestClient.ResponseCallback() {
+            @Override
+            public void onResponse(InputStream response, SalesforceException exception) {
+                processResponse(exchange, response, exception, callback);
+            }
+        });
+    }
+
+    private void processSearch(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+        final String sObjectSearch = getParameter(SOBJECT_SEARCH, exchange, USE_BODY, NOT_OPTIONAL);
+
+        restClient.search(sObjectSearch, new RestClient.ResponseCallback() {
+            @Override
+            public void onResponse(InputStream response, SalesforceException exception) {
+                processResponse(exchange, response, exception, callback);
+            }
+        });
     }
 
     private void restoreFields(Exchange exchange, AbstractSObjectBase sObjectBase,

http://git-wip-us.apache.org/repos/asf/camel/blob/3f8fb8dc/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/BulkApiProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/BulkApiProcessor.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/BulkApiProcessor.java
index 979f531..80d382d 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/BulkApiProcessor.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/BulkApiProcessor.java
@@ -61,279 +61,43 @@ public class BulkApiProcessor extends AbstractSalesforceProcessor {
         try {
             switch (operationName) {
             case CREATE_JOB:
-                JobInfo jobBody = exchange.getIn().getMandatoryBody(JobInfo.class);
-                bulkClient.createJob(jobBody, new BulkApiClient.JobInfoResponseCallback() {
-                    @Override
-                    public void onResponse(JobInfo jobInfo, SalesforceException ex) {
-                        processResponse(exchange, jobInfo, ex, callback);
-                    }
-                });
-
+                processCreateJob(exchange, callback);
                 break;
-
             case GET_JOB:
-                jobBody = exchange.getIn().getBody(JobInfo.class);
-                String jobId;
-                if (jobBody != null) {
-                    jobId = jobBody.getId();
-                } else {
-                    jobId = getParameter(JOB_ID, exchange, USE_BODY, NOT_OPTIONAL);
-                }
-                bulkClient.getJob(jobId, new BulkApiClient.JobInfoResponseCallback() {
-                    @Override
-                    public void onResponse(JobInfo jobInfo, SalesforceException ex) {
-                        processResponse(exchange, jobInfo, ex, callback);
-                    }
-                });
-
+                processGetJob(exchange, callback);
                 break;
-
             case CLOSE_JOB:
-                jobBody = exchange.getIn().getBody(JobInfo.class);
-                if (jobBody != null) {
-                    jobId = jobBody.getId();
-                } else {
-                    jobId = getParameter(JOB_ID, exchange, USE_BODY, NOT_OPTIONAL);
-                }
-                bulkClient.closeJob(jobId, new BulkApiClient.JobInfoResponseCallback() {
-                    @Override
-                    public void onResponse(JobInfo jobInfo, SalesforceException ex) {
-                        processResponse(exchange, jobInfo, ex, callback);
-                    }
-                });
-
+                processCloseJob(exchange, callback);
                 break;
-
             case ABORT_JOB:
-                jobBody = exchange.getIn().getBody(JobInfo.class);
-                if (jobBody != null) {
-                    jobId = jobBody.getId();
-                } else {
-                    jobId = getParameter(JOB_ID, exchange, USE_BODY, NOT_OPTIONAL);
-                }
-                bulkClient.abortJob(jobId, new BulkApiClient.JobInfoResponseCallback() {
-                    @Override
-                    public void onResponse(JobInfo jobInfo, SalesforceException ex) {
-                        processResponse(exchange, jobInfo, ex, callback);
-                    }
-                });
-
+                processAbortJob(exchange, callback);
                 break;
-
             case CREATE_BATCH:
-                // since request is in the body, use headers or endpoint params
-                ContentType contentType = ContentType.fromValue(
-                        getParameter(CONTENT_TYPE, exchange, IGNORE_BODY, NOT_OPTIONAL));
-                jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
-
-                InputStream request;
-                try {
-                    request = exchange.getIn().getMandatoryBody(InputStream.class);
-                } catch (CamelException e) {
-                    String msg = "Error preparing batch request: " + e.getMessage();
-                    throw new SalesforceException(msg, e);
-                }
-
-                bulkClient.createBatch(request, jobId, contentType, new BulkApiClient.BatchInfoResponseCallback() {
-                    @Override
-                    public void onResponse(BatchInfo batchInfo, SalesforceException ex) {
-                        processResponse(exchange, batchInfo, ex, callback);
-                    }
-                });
-
+                processCreateBatch(exchange, callback);
                 break;
-
             case GET_BATCH:
-                BatchInfo batchBody = exchange.getIn().getBody(BatchInfo.class);
-                String batchId;
-                if (batchBody != null) {
-                    jobId = batchBody.getJobId();
-                    batchId = batchBody.getId();
-                } else {
-                    jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
-                    batchId = getParameter(BATCH_ID, exchange, USE_BODY, NOT_OPTIONAL);
-                }
-                bulkClient.getBatch(jobId, batchId, new BulkApiClient.BatchInfoResponseCallback() {
-                    @Override
-                    public void onResponse(BatchInfo batchInfo, SalesforceException ex) {
-                        processResponse(exchange, batchInfo, ex, callback);
-                    }
-                });
-
+                processGetBatch(exchange, callback);
                 break;
-
             case GET_ALL_BATCHES:
-                jobBody = exchange.getIn().getBody(JobInfo.class);
-                if (jobBody != null) {
-                    jobId = jobBody.getId();
-                } else {
-                    jobId = getParameter(JOB_ID, exchange, USE_BODY, NOT_OPTIONAL);
-                }
-                bulkClient.getAllBatches(jobId, new BulkApiClient.BatchInfoListResponseCallback() {
-                    @Override
-                    public void onResponse(List<BatchInfo> batchInfoList, SalesforceException ex) {
-                        processResponse(exchange, batchInfoList, ex, callback);
-                    }
-                });
-
+                processGetAllBatches(exchange, callback);
                 break;
-
             case GET_REQUEST:
-                batchBody = exchange.getIn().getBody(BatchInfo.class);
-                if (batchBody != null) {
-                    jobId = batchBody.getJobId();
-                    batchId = batchBody.getId();
-                } else {
-                    jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
-                    batchId = getParameter(BATCH_ID, exchange, USE_BODY, NOT_OPTIONAL);
-                }
-
-                bulkClient.getRequest(jobId, batchId, new BulkApiClient.StreamResponseCallback() {
-                    @Override
-                    public void onResponse(InputStream inputStream, SalesforceException ex) {
-                        // read the request stream into a StreamCache temp file
-                        // ensures the connection is read
-                        StreamCache body = null;
-                        if (inputStream != null) {
-                            try {
-                                body = StreamCacheConverter.convertToStreamCache(inputStream, exchange);
-                            } catch (IOException e) {
-                                String msg = "Error retrieving batch request: " + e.getMessage();
-                                ex = new SalesforceException(msg, e);
-                            } finally {
-                                // close the input stream to release the Http connection
-                                try {
-                                    inputStream.close();
-                                } catch (IOException ignore) {
-                                }
-                            }
-                        }
-                        processResponse(exchange, body, ex, callback);
-                    }
-                });
-
+                processGetRequest(exchange, callback);
                 break;
-
             case GET_RESULTS:
-                batchBody = exchange.getIn().getBody(BatchInfo.class);
-                if (batchBody != null) {
-                    jobId = batchBody.getJobId();
-                    batchId = batchBody.getId();
-                } else {
-                    jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
-                    batchId = getParameter(BATCH_ID, exchange, USE_BODY, NOT_OPTIONAL);
-                }
-                bulkClient.getResults(jobId, batchId, new BulkApiClient.StreamResponseCallback() {
-                    @Override
-                    public void onResponse(InputStream inputStream, SalesforceException ex) {
-                        // read the result stream into a StreamCache temp file
-                        // ensures the connection is read
-                        StreamCache body = null;
-                        if (inputStream != null) {
-                            try {
-                                body = StreamCacheConverter.convertToStreamCache(inputStream, exchange);
-                            } catch (IOException e) {
-                                String msg = "Error retrieving batch results: " + e.getMessage();
-                                ex = new SalesforceException(msg, e);
-                            } finally {
-                                // close the input stream to release the Http connection
-                                try {
-                                    inputStream.close();
-                                } catch (IOException ignore) {
-                                }
-                            }
-                        }
-                        processResponse(exchange, body, ex, callback);
-                    }
-                });
-
+                processGetResults(exchange, callback);
                 break;
-
             case CREATE_BATCH_QUERY:
-                jobBody = exchange.getIn().getBody(JobInfo.class);
-                String soqlQuery;
-                if (jobBody != null) {
-                    jobId = jobBody.getId();
-                    contentType = jobBody.getContentType();
-                    // use SOQL query from header or endpoint config
-                    soqlQuery = getParameter(SOBJECT_QUERY, exchange, IGNORE_BODY, NOT_OPTIONAL);
-                } else {
-                    jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
-                    contentType = ContentType.fromValue(
-                            getParameter(CONTENT_TYPE, exchange, IGNORE_BODY, NOT_OPTIONAL));
-                    // reuse SOBJECT_QUERY property
-                    soqlQuery = getParameter(SOBJECT_QUERY, exchange, USE_BODY, NOT_OPTIONAL);
-                }
-                bulkClient.createBatchQuery(jobId, soqlQuery, contentType,
-                        new BulkApiClient.BatchInfoResponseCallback() {
-                            @Override
-                            public void onResponse(BatchInfo batchInfo, SalesforceException ex) {
-                                processResponse(exchange, batchInfo, ex, callback);
-                            }
-                        });
-
+                processCreateBatchQuery(exchange, callback);
                 break;
-
             case GET_QUERY_RESULT_IDS:
-                batchBody = exchange.getIn().getBody(BatchInfo.class);
-                if (batchBody != null) {
-                    jobId = batchBody.getJobId();
-                    batchId = batchBody.getId();
-                } else {
-                    jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
-                    batchId = getParameter(BATCH_ID, exchange, USE_BODY, NOT_OPTIONAL);
-                }
-                bulkClient.getQueryResultIds(jobId, batchId, new BulkApiClient.QueryResultIdsCallback() {
-                    @Override
-                    public void onResponse(List<String> ids, SalesforceException ex) {
-                        processResponse(exchange, ids, ex, callback);
-                    }
-                });
-
+                processGetQueryResultIds(exchange, callback);
                 break;
-
             case GET_QUERY_RESULT:
-                batchBody = exchange.getIn().getBody(BatchInfo.class);
-                String resultId;
-                if (batchBody != null) {
-                    jobId = batchBody.getJobId();
-                    batchId = batchBody.getId();
-                    resultId = getParameter(RESULT_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
-                } else {
-                    jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
-                    batchId = getParameter(BATCH_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
-                    resultId = getParameter(RESULT_ID, exchange, USE_BODY, NOT_OPTIONAL);
-                }
-                bulkClient.getQueryResult(jobId, batchId, resultId, new BulkApiClient.StreamResponseCallback() {
-                    @Override
-                    public void onResponse(InputStream inputStream, SalesforceException ex) {
-                        StreamCache body = null;
-                        if (inputStream != null) {
-                            // read the result stream into a StreamCache temp file
-                            // ensures the connection is read
-                            try {
-                                body = StreamCacheConverter.convertToStreamCache(inputStream, exchange);
-                            } catch (IOException e) {
-                                String msg = "Error retrieving query result: " + e.getMessage();
-                                ex = new SalesforceException(msg, e);
-                            } finally {
-                                // close the input stream to release the Http connection
-                                try {
-                                    inputStream.close();
-                                } catch (IOException e) {
-                                    // ignore
-                                }
-                            }
-                        }
-                        processResponse(exchange, body, ex, callback);
-                    }
-                });
-
+                processGetQueryResult(exchange, callback);
                 break;
-
             default:
                 throw new SalesforceException("Unknow operation name: " + operationName, null);
-
             }
         } catch (SalesforceException e) {
             exchange.setException(new SalesforceException(
@@ -359,6 +123,289 @@ public class BulkApiProcessor extends AbstractSalesforceProcessor {
         return done;
     }
 
+    private void processCreateJob(final Exchange exchange, final AsyncCallback callback) throws InvalidPayloadException {
+        JobInfo jobBody = exchange.getIn().getMandatoryBody(JobInfo.class);
+        bulkClient.createJob(jobBody, new BulkApiClient.JobInfoResponseCallback() {
+            @Override
+            public void onResponse(JobInfo jobInfo, SalesforceException ex) {
+                processResponse(exchange, jobInfo, ex, callback);
+            }
+        });
+    }
+
+    private void processGetJob(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+        JobInfo jobBody;
+        jobBody = exchange.getIn().getBody(JobInfo.class);
+        String jobId;
+        if (jobBody != null) {
+            jobId = jobBody.getId();
+        } else {
+            jobId = getParameter(JOB_ID, exchange, USE_BODY, NOT_OPTIONAL);
+        }
+        bulkClient.getJob(jobId, new BulkApiClient.JobInfoResponseCallback() {
+            @Override
+            public void onResponse(JobInfo jobInfo, SalesforceException ex) {
+                processResponse(exchange, jobInfo, ex, callback);
+            }
+        });
+    }
+
+    private void processCloseJob(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+        JobInfo jobBody;
+        String jobId;
+        jobBody = exchange.getIn().getBody(JobInfo.class);
+        if (jobBody != null) {
+            jobId = jobBody.getId();
+        } else {
+            jobId = getParameter(JOB_ID, exchange, USE_BODY, NOT_OPTIONAL);
+        }
+        bulkClient.closeJob(jobId, new BulkApiClient.JobInfoResponseCallback() {
+            @Override
+            public void onResponse(JobInfo jobInfo, SalesforceException ex) {
+                processResponse(exchange, jobInfo, ex, callback);
+            }
+        });
+    }
+
+    private void processAbortJob(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+        JobInfo jobBody;
+        String jobId;
+        jobBody = exchange.getIn().getBody(JobInfo.class);
+        if (jobBody != null) {
+            jobId = jobBody.getId();
+        } else {
+            jobId = getParameter(JOB_ID, exchange, USE_BODY, NOT_OPTIONAL);
+        }
+        bulkClient.abortJob(jobId, new BulkApiClient.JobInfoResponseCallback() {
+            @Override
+            public void onResponse(JobInfo jobInfo, SalesforceException ex) {
+                processResponse(exchange, jobInfo, ex, callback);
+            }
+        });
+    }
+
+    private void processCreateBatch(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+        String jobId;
+        // since request is in the body, use headers or endpoint params
+        ContentType contentType = ContentType.fromValue(
+                getParameter(CONTENT_TYPE, exchange, IGNORE_BODY, NOT_OPTIONAL));
+        jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+
+        InputStream request;
+        try {
+            request = exchange.getIn().getMandatoryBody(InputStream.class);
+        } catch (CamelException e) {
+            String msg = "Error preparing batch request: " + e.getMessage();
+            throw new SalesforceException(msg, e);
+        }
+
+        bulkClient.createBatch(request, jobId, contentType, new BulkApiClient.BatchInfoResponseCallback() {
+            @Override
+            public void onResponse(BatchInfo batchInfo, SalesforceException ex) {
+                processResponse(exchange, batchInfo, ex, callback);
+            }
+        });
+    }
+
+    private void processGetBatch(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+        String jobId;
+        BatchInfo batchBody = exchange.getIn().getBody(BatchInfo.class);
+        String batchId;
+        if (batchBody != null) {
+            jobId = batchBody.getJobId();
+            batchId = batchBody.getId();
+        } else {
+            jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+            batchId = getParameter(BATCH_ID, exchange, USE_BODY, NOT_OPTIONAL);
+        }
+        bulkClient.getBatch(jobId, batchId, new BulkApiClient.BatchInfoResponseCallback() {
+            @Override
+            public void onResponse(BatchInfo batchInfo, SalesforceException ex) {
+                processResponse(exchange, batchInfo, ex, callback);
+            }
+        });
+    }
+
+    private void processGetAllBatches(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+        JobInfo jobBody;
+        String jobId;
+        jobBody = exchange.getIn().getBody(JobInfo.class);
+        if (jobBody != null) {
+            jobId = jobBody.getId();
+        } else {
+            jobId = getParameter(JOB_ID, exchange, USE_BODY, NOT_OPTIONAL);
+        }
+        bulkClient.getAllBatches(jobId, new BulkApiClient.BatchInfoListResponseCallback() {
+            @Override
+            public void onResponse(List<BatchInfo> batchInfoList, SalesforceException ex) {
+                processResponse(exchange, batchInfoList, ex, callback);
+            }
+        });
+    }
+
+    private void processGetRequest(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+        String jobId;
+        BatchInfo batchBody;
+        String batchId;
+        batchBody = exchange.getIn().getBody(BatchInfo.class);
+        if (batchBody != null) {
+            jobId = batchBody.getJobId();
+            batchId = batchBody.getId();
+        } else {
+            jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+            batchId = getParameter(BATCH_ID, exchange, USE_BODY, NOT_OPTIONAL);
+        }
+
+        bulkClient.getRequest(jobId, batchId, new BulkApiClient.StreamResponseCallback() {
+            @Override
+            public void onResponse(InputStream inputStream, SalesforceException ex) {
+                // read the request stream into a StreamCache temp file
+                // ensures the connection is read
+                StreamCache body = null;
+                if (inputStream != null) {
+                    try {
+                        body = StreamCacheConverter.convertToStreamCache(inputStream, exchange);
+                    } catch (IOException e) {
+                        String msg = "Error retrieving batch request: " + e.getMessage();
+                        ex = new SalesforceException(msg, e);
+                    } finally {
+                        // close the input stream to release the Http connection
+                        try {
+                            inputStream.close();
+                        } catch (IOException ignore) {
+                        }
+                    }
+                }
+                processResponse(exchange, body, ex, callback);
+            }
+        });
+    }
+
+    private void processGetResults(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+        String jobId;
+        BatchInfo batchBody;
+        String batchId;
+        batchBody = exchange.getIn().getBody(BatchInfo.class);
+        if (batchBody != null) {
+            jobId = batchBody.getJobId();
+            batchId = batchBody.getId();
+        } else {
+            jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+            batchId = getParameter(BATCH_ID, exchange, USE_BODY, NOT_OPTIONAL);
+        }
+        bulkClient.getResults(jobId, batchId, new BulkApiClient.StreamResponseCallback() {
+            @Override
+            public void onResponse(InputStream inputStream, SalesforceException ex) {
+                // read the result stream into a StreamCache temp file
+                // ensures the connection is read
+                StreamCache body = null;
+                if (inputStream != null) {
+                    try {
+                        body = StreamCacheConverter.convertToStreamCache(inputStream, exchange);
+                    } catch (IOException e) {
+                        String msg = "Error retrieving batch results: " + e.getMessage();
+                        ex = new SalesforceException(msg, e);
+                    } finally {
+                        // close the input stream to release the Http connection
+                        try {
+                            inputStream.close();
+                        } catch (IOException ignore) {
+                        }
+                    }
+                }
+                processResponse(exchange, body, ex, callback);
+            }
+        });
+    }
+
+    private void processCreateBatchQuery(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+        JobInfo jobBody;
+        String jobId;
+        ContentType contentType;
+        jobBody = exchange.getIn().getBody(JobInfo.class);
+        String soqlQuery;
+        if (jobBody != null) {
+            jobId = jobBody.getId();
+            contentType = jobBody.getContentType();
+            // use SOQL query from header or endpoint config
+            soqlQuery = getParameter(SOBJECT_QUERY, exchange, IGNORE_BODY, NOT_OPTIONAL);
+        } else {
+            jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+            contentType = ContentType.fromValue(
+                    getParameter(CONTENT_TYPE, exchange, IGNORE_BODY, NOT_OPTIONAL));
+            // reuse SOBJECT_QUERY property
+            soqlQuery = getParameter(SOBJECT_QUERY, exchange, USE_BODY, NOT_OPTIONAL);
+        }
+        bulkClient.createBatchQuery(jobId, soqlQuery, contentType,
+                new BulkApiClient.BatchInfoResponseCallback() {
+                    @Override
+                    public void onResponse(BatchInfo batchInfo, SalesforceException ex) {
+                        processResponse(exchange, batchInfo, ex, callback);
+                    }
+                });
+    }
+
+    private void processGetQueryResultIds(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+        String jobId;
+        BatchInfo batchBody;
+        String batchId;
+        batchBody = exchange.getIn().getBody(BatchInfo.class);
+        if (batchBody != null) {
+            jobId = batchBody.getJobId();
+            batchId = batchBody.getId();
+        } else {
+            jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+            batchId = getParameter(BATCH_ID, exchange, USE_BODY, NOT_OPTIONAL);
+        }
+        bulkClient.getQueryResultIds(jobId, batchId, new BulkApiClient.QueryResultIdsCallback() {
+            @Override
+            public void onResponse(List<String> ids, SalesforceException ex) {
+                processResponse(exchange, ids, ex, callback);
+            }
+        });
+    }
+
+    private void processGetQueryResult(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+        String jobId;
+        BatchInfo batchBody;
+        String batchId;
+        batchBody = exchange.getIn().getBody(BatchInfo.class);
+        String resultId;
+        if (batchBody != null) {
+            jobId = batchBody.getJobId();
+            batchId = batchBody.getId();
+            resultId = getParameter(RESULT_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+        } else {
+            jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+            batchId = getParameter(BATCH_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+            resultId = getParameter(RESULT_ID, exchange, USE_BODY, NOT_OPTIONAL);
+        }
+        bulkClient.getQueryResult(jobId, batchId, resultId, new BulkApiClient.StreamResponseCallback() {
+            @Override
+            public void onResponse(InputStream inputStream, SalesforceException ex) {
+                StreamCache body = null;
+                if (inputStream != null) {
+                    // read the result stream into a StreamCache temp file
+                    // ensures the connection is read
+                    try {
+                        body = StreamCacheConverter.convertToStreamCache(inputStream, exchange);
+                    } catch (IOException e) {
+                        String msg = "Error retrieving query result: " + e.getMessage();
+                        ex = new SalesforceException(msg, e);
+                    } finally {
+                        // close the input stream to release the Http connection
+                        try {
+                            inputStream.close();
+                        } catch (IOException e) {
+                            // ignore
+                        }
+                    }
+                }
+                processResponse(exchange, body, ex, callback);
+            }
+        });
+    }
+
     private void processResponse(Exchange exchange, Object body, SalesforceException ex, AsyncCallback callback) {
         final Message out = exchange.getOut();
         if (ex != null) {