You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cm...@apache.org on 2013/08/25 18:49:44 UTC
git commit: fixed CS error - only extracted some mothods to reduce
the complexity and lenght of mothods
Updated Branches:
refs/heads/master 2f0970a67 -> 3f8fb8dc9
fixed CS error - only extracted some mothods to reduce the complexity and lenght of mothods
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3f8fb8dc
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3f8fb8dc
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3f8fb8dc
Branch: refs/heads/master
Commit: 3f8fb8dc9a961fbc772d21af42470269d802767b
Parents: 2f0970a
Author: cmueller <cm...@apache.org>
Authored: Sun Aug 25 18:49:31 2013 +0200
Committer: cmueller <cm...@apache.org>
Committed: Sun Aug 25 18:49:31 2013 +0200
----------------------------------------------------------------------
.../processor/AbstractRestProcessor.java | 587 ++++++++++---------
.../internal/processor/BulkApiProcessor.java | 543 +++++++++--------
2 files changed, 609 insertions(+), 521 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/3f8fb8dc/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractRestProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractRestProcessor.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractRestProcessor.java
index 9869caf..661a896 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractRestProcessor.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractRestProcessor.java
@@ -78,346 +78,387 @@ public abstract class AbstractRestProcessor extends AbstractSalesforceProcessor
// call Salesforce asynchronously
try {
-
// call Operation using REST client
switch (operationName) {
case GET_VERSIONS:
- restClient.getVersions(new RestClient.ResponseCallback() {
- @Override
- public void onResponse(InputStream response, SalesforceException exception) {
- // process response entity and create out message
- processResponse(exchange, response, exception, callback);
- }
- });
+ processGetVersions(exchange, callback);
break;
-
case GET_RESOURCES:
- restClient.getResources(new RestClient.ResponseCallback() {
- @Override
- public void onResponse(InputStream response, SalesforceException exception) {
- processResponse(exchange, response, exception, callback);
- }
- });
+ processGetResources(exchange, callback);
break;
-
case GET_GLOBAL_OBJECTS:
- restClient.getGlobalObjects(new RestClient.ResponseCallback() {
- @Override
- public void onResponse(InputStream response, SalesforceException exception) {
- processResponse(exchange, response, exception, callback);
- }
- });
+ processGetGlobalObjects(exchange, callback);
break;
-
case GET_BASIC_INFO:
- String sObjectName = getParameter(SOBJECT_NAME, exchange, USE_BODY, NOT_OPTIONAL);
- restClient.getBasicInfo(sObjectName, new RestClient.ResponseCallback() {
- @Override
- public void onResponse(InputStream response, SalesforceException exception) {
- processResponse(exchange, response, exception, callback);
- }
- });
+ processGetBasicInfo(exchange, callback);
break;
-
case GET_DESCRIPTION:
- sObjectName = getParameter(SOBJECT_NAME, exchange, USE_BODY, NOT_OPTIONAL);
- restClient.getDescription(sObjectName, new RestClient.ResponseCallback() {
- @Override
- public void onResponse(InputStream response, SalesforceException exception) {
- processResponse(exchange, response, exception, callback);
- }
- });
+ processGetDescription(exchange, callback);
break;
+ case GET_SOBJECT:
+ processGetSobject(exchange, callback);
+ break;
+ case CREATE_SOBJECT:
+ processCreateSobject(exchange, callback);
+ break;
+ case UPDATE_SOBJECT:
+ processUpdateSobject(exchange, callback);
+ break;
+ case DELETE_SOBJECT:
+ processDeleteSobject(exchange, callback);
+ break;
+ case GET_SOBJECT_WITH_ID:
+ processGetSobjectWithId(exchange, callback);
+ break;
+ case UPSERT_SOBJECT:
+ processUpsertSobject(exchange, callback);
+ break;
+ case DELETE_SOBJECT_WITH_ID:
+ processDeleteSobjectWithId(exchange, callback);
+ break;
+ case GET_BLOB_FIELD:
+ processGetBlobField(exchange, callback);
+ break;
+ case QUERY:
+ processQuery(exchange, callback);
+ break;
+ case QUERY_MORE:
+ processQueryMore(exchange, callback);
+ break;
+ case SEARCH:
+ processSearch(exchange, callback);
+ break;
+ default:
+ throw new SalesforceException("Unknow operation name: " + operationName, null);
+ }
+ } catch (SalesforceException e) {
+ exchange.setException(new SalesforceException(
+ String.format("Error processing %s: [%s] \"%s\"",
+ operationName, e.getStatusCode(), e.getMessage()),
+ e));
+ callback.done(true);
+ return true;
+ } catch (RuntimeException e) {
+ exchange.setException(new SalesforceException(
+ String.format("Unexpected Error processing %s: \"%s\"",
+ operationName, e.getMessage()),
+ e));
+ callback.done(true);
+ return true;
+ }
- case GET_SOBJECT: {
- String sObjectIdValue;
- // determine parameters from input AbstractSObject
- final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
- if (sObjectBase != null) {
- sObjectName = sObjectBase.getClass().getSimpleName();
- sObjectIdValue = sObjectBase.getId();
- } else {
- sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
- sObjectIdValue = getParameter(SOBJECT_ID, exchange, USE_BODY, NOT_OPTIONAL);
- }
- final String sObjectId = sObjectIdValue;
-
- // use sObject name to load class
- setResponseClass(exchange, sObjectName);
-
- // get optional field list
- String fieldsValue = getParameter(SOBJECT_FIELDS, exchange, IGNORE_BODY, IS_OPTIONAL);
- String[] fields = null;
- if (fieldsValue != null) {
- fields = fieldsValue.split(",");
- }
+ // continue routing asynchronously
+ return false;
+ }
- restClient.getSObject(sObjectName, sObjectId, fields, new RestClient.ResponseCallback() {
- @Override
- public void onResponse(InputStream response, SalesforceException exception) {
- processResponse(exchange, response, exception, callback);
- restoreFields(exchange, sObjectBase, sObjectId, null, null);
- }
- });
+ private void processGetVersions(final Exchange exchange, final AsyncCallback callback) {
+ restClient.getVersions(new RestClient.ResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException exception) {
+ // process response entity and create out message
+ processResponse(exchange, response, exception, callback);
+ }
+ });
+ }
- break;
+ private void processGetResources(final Exchange exchange, final AsyncCallback callback) {
+ restClient.getResources(new RestClient.ResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException exception) {
+ processResponse(exchange, response, exception, callback);
}
+ });
+ }
- case CREATE_SOBJECT: {
- // determine parameters from input AbstractSObject
- AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
- if (sObjectBase != null) {
- sObjectName = sObjectBase.getClass().getSimpleName();
- } else {
- sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
- }
+ private void processGetGlobalObjects(final Exchange exchange, final AsyncCallback callback) {
+ restClient.getGlobalObjects(new RestClient.ResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException exception) {
+ processResponse(exchange, response, exception, callback);
+ }
+ });
+ }
- restClient.createSObject(sObjectName, getRequestStream(exchange),
- new RestClient.ResponseCallback() {
- @Override
- public void onResponse(InputStream response, SalesforceException exception) {
- processResponse(exchange, response, exception, callback);
- }
- });
+ private void processGetBasicInfo(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+ String sObjectName = getParameter(SOBJECT_NAME, exchange, USE_BODY, NOT_OPTIONAL);
+ restClient.getBasicInfo(sObjectName, new RestClient.ResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException exception) {
+ processResponse(exchange, response, exception, callback);
+ }
+ });
+ }
- break;
+ private void processGetDescription(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+ String sObjectName;
+ sObjectName = getParameter(SOBJECT_NAME, exchange, USE_BODY, NOT_OPTIONAL);
+ restClient.getDescription(sObjectName, new RestClient.ResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException exception) {
+ processResponse(exchange, response, exception, callback);
}
+ });
+ }
- case UPDATE_SOBJECT: {
- // determine parameters from input AbstractSObject
- final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
- String sObjectId;
- if (sObjectBase != null) {
- sObjectName = sObjectBase.getClass().getSimpleName();
- // remember the sObject Id
- sObjectId = sObjectBase.getId();
- // clear base object fields, which cannot be updated
- sObjectBase.clearBaseFields();
- } else {
- sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
- sObjectId = getParameter(SOBJECT_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
- }
+ private void processGetSobject(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+ String sObjectName;
+ String sObjectIdValue;
+ // determine parameters from input AbstractSObject
+ final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+ if (sObjectBase != null) {
+ sObjectName = sObjectBase.getClass().getSimpleName();
+ sObjectIdValue = sObjectBase.getId();
+ } else {
+ sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ sObjectIdValue = getParameter(SOBJECT_ID, exchange, USE_BODY, NOT_OPTIONAL);
+ }
+ final String sObjectId = sObjectIdValue;
- final String finalsObjectId = sObjectId;
- restClient.updateSObject(sObjectName, sObjectId, getRequestStream(exchange),
- new RestClient.ResponseCallback() {
- @Override
- public void onResponse(InputStream response, SalesforceException exception) {
- processResponse(exchange, response, exception, callback);
- restoreFields(exchange, sObjectBase, finalsObjectId, null, null);
- }
- });
+ // use sObject name to load class
+ setResponseClass(exchange, sObjectName);
- break;
+ // get optional field list
+ String fieldsValue = getParameter(SOBJECT_FIELDS, exchange, IGNORE_BODY, IS_OPTIONAL);
+ String[] fields = null;
+ if (fieldsValue != null) {
+ fields = fieldsValue.split(",");
+ }
+
+ restClient.getSObject(sObjectName, sObjectId, fields, new RestClient.ResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException exception) {
+ processResponse(exchange, response, exception, callback);
+ restoreFields(exchange, sObjectBase, sObjectId, null, null);
}
+ });
+ }
- case DELETE_SOBJECT: {
- // determine parameters from input AbstractSObject
- final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
- String sObjectIdValue;
- if (sObjectBase != null) {
- sObjectName = sObjectBase.getClass().getSimpleName();
- sObjectIdValue = sObjectBase.getId();
- } else {
- sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
- sObjectIdValue = getParameter(SOBJECT_ID, exchange, USE_BODY, NOT_OPTIONAL);
- }
- final String sObjectId = sObjectIdValue;
+ private void processCreateSobject(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+ String sObjectName;
+ // determine parameters from input AbstractSObject
+ AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+ if (sObjectBase != null) {
+ sObjectName = sObjectBase.getClass().getSimpleName();
+ } else {
+ sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ }
- restClient.deleteSObject(sObjectName, sObjectId, new RestClient.ResponseCallback() {
+ restClient.createSObject(sObjectName, getRequestStream(exchange),
+ new RestClient.ResponseCallback() {
@Override
public void onResponse(InputStream response, SalesforceException exception) {
processResponse(exchange, response, exception, callback);
- restoreFields(exchange, sObjectBase, sObjectId, null, null);
}
});
- break;
- }
-
- case GET_SOBJECT_WITH_ID: {
- Object oldValue = null;
- String sObjectExtIdValue;
- final String sObjectExtIdName = getParameter(SOBJECT_EXT_ID_NAME,
- exchange, IGNORE_BODY, NOT_OPTIONAL);
-
- // determine parameters from input AbstractSObject
- final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
- if (sObjectBase != null) {
- sObjectName = sObjectBase.getClass().getSimpleName();
- oldValue = getAndClearPropertyValue(sObjectBase, sObjectExtIdName);
- sObjectExtIdValue = oldValue.toString();
- } else {
- sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
- sObjectExtIdValue = getParameter(SOBJECT_EXT_ID_VALUE, exchange, USE_BODY, NOT_OPTIONAL);
- }
-
- // use sObject name to load class
- setResponseClass(exchange, sObjectName);
-
- final Object finalOldValue = oldValue;
- restClient.getSObjectWithId(sObjectName, sObjectExtIdName, sObjectExtIdValue,
- new RestClient.ResponseCallback() {
- @Override
- public void onResponse(InputStream response, SalesforceException exception) {
- processResponse(exchange, response, exception, callback);
- restoreFields(exchange, sObjectBase, null, sObjectExtIdName, finalOldValue);
- }
- });
+ }
- break;
- }
+ private void processUpdateSobject(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+ String sObjectName;
+ // determine parameters from input AbstractSObject
+ final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+ String sObjectId;
+ if (sObjectBase != null) {
+ sObjectName = sObjectBase.getClass().getSimpleName();
+ // remember the sObject Id
+ sObjectId = sObjectBase.getId();
+ // clear base object fields, which cannot be updated
+ sObjectBase.clearBaseFields();
+ } else {
+ sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ sObjectId = getParameter(SOBJECT_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ }
- case UPSERT_SOBJECT: {
- String sObjectExtIdValue;
- final String sObjectExtIdName = getParameter(SOBJECT_EXT_ID_NAME, exchange,
- IGNORE_BODY, NOT_OPTIONAL);
-
- // determine parameters from input AbstractSObject
- Object oldValue = null;
- final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
- if (sObjectBase != null) {
- sObjectName = sObjectBase.getClass().getSimpleName();
- oldValue = getAndClearPropertyValue(sObjectBase, sObjectExtIdName);
- sObjectExtIdValue = oldValue.toString();
- // clear base object fields, which cannot be updated
- sObjectBase.clearBaseFields();
- } else {
- sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
- sObjectExtIdValue = getParameter(SOBJECT_EXT_ID_VALUE, exchange, IGNORE_BODY, NOT_OPTIONAL);
- }
+ final String finalsObjectId = sObjectId;
+ restClient.updateSObject(sObjectName, sObjectId, getRequestStream(exchange),
+ new RestClient.ResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException exception) {
+ processResponse(exchange, response, exception, callback);
+ restoreFields(exchange, sObjectBase, finalsObjectId, null, null);
+ }
+ });
+ }
- final Object finalOldValue = oldValue;
- restClient.upsertSObject(sObjectName, sObjectExtIdName, sObjectExtIdValue, getRequestStream(exchange),
- new RestClient.ResponseCallback() {
- @Override
- public void onResponse(InputStream response, SalesforceException exception) {
- processResponse(exchange, response, exception, callback);
- restoreFields(exchange, sObjectBase, null, sObjectExtIdName, finalOldValue);
- }
- });
+ private void processDeleteSobject(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+ String sObjectName;
+ // determine parameters from input AbstractSObject
+ final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+ String sObjectIdValue;
+ if (sObjectBase != null) {
+ sObjectName = sObjectBase.getClass().getSimpleName();
+ sObjectIdValue = sObjectBase.getId();
+ } else {
+ sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ sObjectIdValue = getParameter(SOBJECT_ID, exchange, USE_BODY, NOT_OPTIONAL);
+ }
+ final String sObjectId = sObjectIdValue;
- break;
+ restClient.deleteSObject(sObjectName, sObjectId, new RestClient.ResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException exception) {
+ processResponse(exchange, response, exception, callback);
+ restoreFields(exchange, sObjectBase, sObjectId, null, null);
}
+ });
+ }
- case DELETE_SOBJECT_WITH_ID: {
- final String sObjectExtIdName = getParameter(SOBJECT_EXT_ID_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
-
- // determine parameters from input AbstractSObject
- Object oldValue = null;
- final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
- String sObjectExtIdValue;
- if (sObjectBase != null) {
- sObjectName = sObjectBase.getClass().getSimpleName();
- oldValue = getAndClearPropertyValue(sObjectBase, sObjectExtIdName);
- sObjectExtIdValue = oldValue.toString();
- } else {
- sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
- sObjectExtIdValue = getParameter(SOBJECT_EXT_ID_VALUE, exchange, USE_BODY, NOT_OPTIONAL);
- }
+ private void processGetSobjectWithId(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+ String sObjectName;
+ Object oldValue = null;
+ String sObjectExtIdValue;
+ final String sObjectExtIdName = getParameter(SOBJECT_EXT_ID_NAME,
+ exchange, IGNORE_BODY, NOT_OPTIONAL);
- final Object finalOldValue = oldValue;
- restClient.deleteSObjectWithId(sObjectName, sObjectExtIdName, sObjectExtIdValue,
- new RestClient.ResponseCallback() {
- @Override
- public void onResponse(InputStream response, SalesforceException exception) {
- processResponse(exchange, response, exception, callback);
- restoreFields(exchange, sObjectBase, null, sObjectExtIdName, finalOldValue);
- }
- });
+ // determine parameters from input AbstractSObject
+ final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+ if (sObjectBase != null) {
+ sObjectName = sObjectBase.getClass().getSimpleName();
+ oldValue = getAndClearPropertyValue(sObjectBase, sObjectExtIdName);
+ sObjectExtIdValue = oldValue.toString();
+ } else {
+ sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ sObjectExtIdValue = getParameter(SOBJECT_EXT_ID_VALUE, exchange, USE_BODY, NOT_OPTIONAL);
+ }
- break;
- }
+ // use sObject name to load class
+ setResponseClass(exchange, sObjectName);
- case GET_BLOB_FIELD: {
- // get blob field name
- final String sObjectBlobFieldName = getParameter(SOBJECT_BLOB_FIELD_NAME,
- exchange, IGNORE_BODY, NOT_OPTIONAL);
-
- // determine parameters from input AbstractSObject
- final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
- String sObjectIdValue;
- if (sObjectBase != null) {
- sObjectName = sObjectBase.getClass().getSimpleName();
- sObjectIdValue = sObjectBase.getId();
- } else {
- sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
- sObjectIdValue = getParameter(SOBJECT_ID, exchange, USE_BODY, NOT_OPTIONAL);
- }
- final String sObjectId = sObjectIdValue;
-
- restClient.getBlobField(sObjectName, sObjectId, sObjectBlobFieldName,
- new RestClient.ResponseCallback() {
- @Override
- public void onResponse(InputStream response, SalesforceException exception) {
- processResponse(exchange, response, exception, callback);
- restoreFields(exchange, sObjectBase, sObjectId, null, null);
- }
- });
- break;
- }
+ final Object finalOldValue = oldValue;
+ restClient.getSObjectWithId(sObjectName, sObjectExtIdName, sObjectExtIdValue,
+ new RestClient.ResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException exception) {
+ processResponse(exchange, response, exception, callback);
+ restoreFields(exchange, sObjectBase, null, sObjectExtIdName, finalOldValue);
+ }
+ });
+ }
- case QUERY:
- final String sObjectQuery = getParameter(SOBJECT_QUERY, exchange, USE_BODY, NOT_OPTIONAL);
+ private void processUpsertSobject(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+ String sObjectName;
+ String sObjectExtIdValue;
+ final String sObjectExtIdName = getParameter(SOBJECT_EXT_ID_NAME, exchange,
+ IGNORE_BODY, NOT_OPTIONAL);
- // use sObject name to load class
- setResponseClass(exchange, null);
+ // determine parameters from input AbstractSObject
+ Object oldValue = null;
+ final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+ if (sObjectBase != null) {
+ sObjectName = sObjectBase.getClass().getSimpleName();
+ oldValue = getAndClearPropertyValue(sObjectBase, sObjectExtIdName);
+ sObjectExtIdValue = oldValue.toString();
+ // clear base object fields, which cannot be updated
+ sObjectBase.clearBaseFields();
+ } else {
+ sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ sObjectExtIdValue = getParameter(SOBJECT_EXT_ID_VALUE, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ }
- restClient.query(sObjectQuery, new RestClient.ResponseCallback() {
+ final Object finalOldValue = oldValue;
+ restClient.upsertSObject(sObjectName, sObjectExtIdName, sObjectExtIdValue, getRequestStream(exchange),
+ new RestClient.ResponseCallback() {
@Override
public void onResponse(InputStream response, SalesforceException exception) {
processResponse(exchange, response, exception, callback);
+ restoreFields(exchange, sObjectBase, null, sObjectExtIdName, finalOldValue);
}
});
- break;
+ }
- case QUERY_MORE:
- // reuse SOBJECT_QUERY parameter name for nextRecordsUrl
- final String nextRecordsUrl = getParameter(SOBJECT_QUERY, exchange, USE_BODY, NOT_OPTIONAL);
+ private void processDeleteSobjectWithId(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+ String sObjectName;
+ final String sObjectExtIdName = getParameter(SOBJECT_EXT_ID_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
- // use custom response class property
- setResponseClass(exchange, null);
+ // determine parameters from input AbstractSObject
+ Object oldValue = null;
+ final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+ String sObjectExtIdValue;
+ if (sObjectBase != null) {
+ sObjectName = sObjectBase.getClass().getSimpleName();
+ oldValue = getAndClearPropertyValue(sObjectBase, sObjectExtIdName);
+ sObjectExtIdValue = oldValue.toString();
+ } else {
+ sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ sObjectExtIdValue = getParameter(SOBJECT_EXT_ID_VALUE, exchange, USE_BODY, NOT_OPTIONAL);
+ }
- restClient.queryMore(nextRecordsUrl, new RestClient.ResponseCallback() {
+ final Object finalOldValue = oldValue;
+ restClient.deleteSObjectWithId(sObjectName, sObjectExtIdName, sObjectExtIdValue,
+ new RestClient.ResponseCallback() {
@Override
public void onResponse(InputStream response, SalesforceException exception) {
processResponse(exchange, response, exception, callback);
+ restoreFields(exchange, sObjectBase, null, sObjectExtIdName, finalOldValue);
}
});
- break;
+ }
- case SEARCH:
- final String sObjectSearch = getParameter(SOBJECT_SEARCH, exchange, USE_BODY, NOT_OPTIONAL);
+ private void processGetBlobField(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+ String sObjectName;
+ // get blob field name
+ final String sObjectBlobFieldName = getParameter(SOBJECT_BLOB_FIELD_NAME,
+ exchange, IGNORE_BODY, NOT_OPTIONAL);
- restClient.search(sObjectSearch, new RestClient.ResponseCallback() {
+ // determine parameters from input AbstractSObject
+ final AbstractSObjectBase sObjectBase = exchange.getIn().getBody(AbstractSObjectBase.class);
+ String sObjectIdValue;
+ if (sObjectBase != null) {
+ sObjectName = sObjectBase.getClass().getSimpleName();
+ sObjectIdValue = sObjectBase.getId();
+ } else {
+ sObjectName = getParameter(SOBJECT_NAME, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ sObjectIdValue = getParameter(SOBJECT_ID, exchange, USE_BODY, NOT_OPTIONAL);
+ }
+ final String sObjectId = sObjectIdValue;
+
+ restClient.getBlobField(sObjectName, sObjectId, sObjectBlobFieldName,
+ new RestClient.ResponseCallback() {
@Override
public void onResponse(InputStream response, SalesforceException exception) {
processResponse(exchange, response, exception, callback);
+ restoreFields(exchange, sObjectBase, sObjectId, null, null);
}
});
- break;
+ }
- default:
- throw new SalesforceException("Unknow operation name: " + operationName, null);
+ private void processQuery(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+ final String sObjectQuery = getParameter(SOBJECT_QUERY, exchange, USE_BODY, NOT_OPTIONAL);
+
+ // use sObject name to load class
+ setResponseClass(exchange, null);
+ restClient.query(sObjectQuery, new RestClient.ResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException exception) {
+ processResponse(exchange, response, exception, callback);
}
- } catch (SalesforceException e) {
- exchange.setException(new SalesforceException(
- String.format("Error processing %s: [%s] \"%s\"",
- operationName, e.getStatusCode(), e.getMessage()),
- e));
- callback.done(true);
- return true;
- } catch (RuntimeException e) {
- exchange.setException(new SalesforceException(
- String.format("Unexpected Error processing %s: \"%s\"",
- operationName, e.getMessage()),
- e));
- callback.done(true);
- return true;
- }
+ });
+ }
- // continue routing asynchronously
- return false;
+ private void processQueryMore(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+ // reuse SOBJECT_QUERY parameter name for nextRecordsUrl
+ final String nextRecordsUrl = getParameter(SOBJECT_QUERY, exchange, USE_BODY, NOT_OPTIONAL);
+
+ // use custom response class property
+ setResponseClass(exchange, null);
+
+ restClient.queryMore(nextRecordsUrl, new RestClient.ResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException exception) {
+ processResponse(exchange, response, exception, callback);
+ }
+ });
+ }
+
+ private void processSearch(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+ final String sObjectSearch = getParameter(SOBJECT_SEARCH, exchange, USE_BODY, NOT_OPTIONAL);
+
+ restClient.search(sObjectSearch, new RestClient.ResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException exception) {
+ processResponse(exchange, response, exception, callback);
+ }
+ });
}
private void restoreFields(Exchange exchange, AbstractSObjectBase sObjectBase,
http://git-wip-us.apache.org/repos/asf/camel/blob/3f8fb8dc/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/BulkApiProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/BulkApiProcessor.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/BulkApiProcessor.java
index 979f531..80d382d 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/BulkApiProcessor.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/BulkApiProcessor.java
@@ -61,279 +61,43 @@ public class BulkApiProcessor extends AbstractSalesforceProcessor {
try {
switch (operationName) {
case CREATE_JOB:
- JobInfo jobBody = exchange.getIn().getMandatoryBody(JobInfo.class);
- bulkClient.createJob(jobBody, new BulkApiClient.JobInfoResponseCallback() {
- @Override
- public void onResponse(JobInfo jobInfo, SalesforceException ex) {
- processResponse(exchange, jobInfo, ex, callback);
- }
- });
-
+ processCreateJob(exchange, callback);
break;
-
case GET_JOB:
- jobBody = exchange.getIn().getBody(JobInfo.class);
- String jobId;
- if (jobBody != null) {
- jobId = jobBody.getId();
- } else {
- jobId = getParameter(JOB_ID, exchange, USE_BODY, NOT_OPTIONAL);
- }
- bulkClient.getJob(jobId, new BulkApiClient.JobInfoResponseCallback() {
- @Override
- public void onResponse(JobInfo jobInfo, SalesforceException ex) {
- processResponse(exchange, jobInfo, ex, callback);
- }
- });
-
+ processGetJob(exchange, callback);
break;
-
case CLOSE_JOB:
- jobBody = exchange.getIn().getBody(JobInfo.class);
- if (jobBody != null) {
- jobId = jobBody.getId();
- } else {
- jobId = getParameter(JOB_ID, exchange, USE_BODY, NOT_OPTIONAL);
- }
- bulkClient.closeJob(jobId, new BulkApiClient.JobInfoResponseCallback() {
- @Override
- public void onResponse(JobInfo jobInfo, SalesforceException ex) {
- processResponse(exchange, jobInfo, ex, callback);
- }
- });
-
+ processCloseJob(exchange, callback);
break;
-
case ABORT_JOB:
- jobBody = exchange.getIn().getBody(JobInfo.class);
- if (jobBody != null) {
- jobId = jobBody.getId();
- } else {
- jobId = getParameter(JOB_ID, exchange, USE_BODY, NOT_OPTIONAL);
- }
- bulkClient.abortJob(jobId, new BulkApiClient.JobInfoResponseCallback() {
- @Override
- public void onResponse(JobInfo jobInfo, SalesforceException ex) {
- processResponse(exchange, jobInfo, ex, callback);
- }
- });
-
+ processAbortJob(exchange, callback);
break;
-
case CREATE_BATCH:
- // since request is in the body, use headers or endpoint params
- ContentType contentType = ContentType.fromValue(
- getParameter(CONTENT_TYPE, exchange, IGNORE_BODY, NOT_OPTIONAL));
- jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
-
- InputStream request;
- try {
- request = exchange.getIn().getMandatoryBody(InputStream.class);
- } catch (CamelException e) {
- String msg = "Error preparing batch request: " + e.getMessage();
- throw new SalesforceException(msg, e);
- }
-
- bulkClient.createBatch(request, jobId, contentType, new BulkApiClient.BatchInfoResponseCallback() {
- @Override
- public void onResponse(BatchInfo batchInfo, SalesforceException ex) {
- processResponse(exchange, batchInfo, ex, callback);
- }
- });
-
+ processCreateBatch(exchange, callback);
break;
-
case GET_BATCH:
- BatchInfo batchBody = exchange.getIn().getBody(BatchInfo.class);
- String batchId;
- if (batchBody != null) {
- jobId = batchBody.getJobId();
- batchId = batchBody.getId();
- } else {
- jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
- batchId = getParameter(BATCH_ID, exchange, USE_BODY, NOT_OPTIONAL);
- }
- bulkClient.getBatch(jobId, batchId, new BulkApiClient.BatchInfoResponseCallback() {
- @Override
- public void onResponse(BatchInfo batchInfo, SalesforceException ex) {
- processResponse(exchange, batchInfo, ex, callback);
- }
- });
-
+ processGetBatch(exchange, callback);
break;
-
case GET_ALL_BATCHES:
- jobBody = exchange.getIn().getBody(JobInfo.class);
- if (jobBody != null) {
- jobId = jobBody.getId();
- } else {
- jobId = getParameter(JOB_ID, exchange, USE_BODY, NOT_OPTIONAL);
- }
- bulkClient.getAllBatches(jobId, new BulkApiClient.BatchInfoListResponseCallback() {
- @Override
- public void onResponse(List<BatchInfo> batchInfoList, SalesforceException ex) {
- processResponse(exchange, batchInfoList, ex, callback);
- }
- });
-
+ processGetAllBatches(exchange, callback);
break;
-
case GET_REQUEST:
- batchBody = exchange.getIn().getBody(BatchInfo.class);
- if (batchBody != null) {
- jobId = batchBody.getJobId();
- batchId = batchBody.getId();
- } else {
- jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
- batchId = getParameter(BATCH_ID, exchange, USE_BODY, NOT_OPTIONAL);
- }
-
- bulkClient.getRequest(jobId, batchId, new BulkApiClient.StreamResponseCallback() {
- @Override
- public void onResponse(InputStream inputStream, SalesforceException ex) {
- // read the request stream into a StreamCache temp file
- // ensures the connection is read
- StreamCache body = null;
- if (inputStream != null) {
- try {
- body = StreamCacheConverter.convertToStreamCache(inputStream, exchange);
- } catch (IOException e) {
- String msg = "Error retrieving batch request: " + e.getMessage();
- ex = new SalesforceException(msg, e);
- } finally {
- // close the input stream to release the Http connection
- try {
- inputStream.close();
- } catch (IOException ignore) {
- }
- }
- }
- processResponse(exchange, body, ex, callback);
- }
- });
-
+ processGetRequest(exchange, callback);
break;
-
case GET_RESULTS:
- batchBody = exchange.getIn().getBody(BatchInfo.class);
- if (batchBody != null) {
- jobId = batchBody.getJobId();
- batchId = batchBody.getId();
- } else {
- jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
- batchId = getParameter(BATCH_ID, exchange, USE_BODY, NOT_OPTIONAL);
- }
- bulkClient.getResults(jobId, batchId, new BulkApiClient.StreamResponseCallback() {
- @Override
- public void onResponse(InputStream inputStream, SalesforceException ex) {
- // read the result stream into a StreamCache temp file
- // ensures the connection is read
- StreamCache body = null;
- if (inputStream != null) {
- try {
- body = StreamCacheConverter.convertToStreamCache(inputStream, exchange);
- } catch (IOException e) {
- String msg = "Error retrieving batch results: " + e.getMessage();
- ex = new SalesforceException(msg, e);
- } finally {
- // close the input stream to release the Http connection
- try {
- inputStream.close();
- } catch (IOException ignore) {
- }
- }
- }
- processResponse(exchange, body, ex, callback);
- }
- });
-
+ processGetResults(exchange, callback);
break;
-
case CREATE_BATCH_QUERY:
- jobBody = exchange.getIn().getBody(JobInfo.class);
- String soqlQuery;
- if (jobBody != null) {
- jobId = jobBody.getId();
- contentType = jobBody.getContentType();
- // use SOQL query from header or endpoint config
- soqlQuery = getParameter(SOBJECT_QUERY, exchange, IGNORE_BODY, NOT_OPTIONAL);
- } else {
- jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
- contentType = ContentType.fromValue(
- getParameter(CONTENT_TYPE, exchange, IGNORE_BODY, NOT_OPTIONAL));
- // reuse SOBJECT_QUERY property
- soqlQuery = getParameter(SOBJECT_QUERY, exchange, USE_BODY, NOT_OPTIONAL);
- }
- bulkClient.createBatchQuery(jobId, soqlQuery, contentType,
- new BulkApiClient.BatchInfoResponseCallback() {
- @Override
- public void onResponse(BatchInfo batchInfo, SalesforceException ex) {
- processResponse(exchange, batchInfo, ex, callback);
- }
- });
-
+ processCreateBatchQuery(exchange, callback);
break;
-
case GET_QUERY_RESULT_IDS:
- batchBody = exchange.getIn().getBody(BatchInfo.class);
- if (batchBody != null) {
- jobId = batchBody.getJobId();
- batchId = batchBody.getId();
- } else {
- jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
- batchId = getParameter(BATCH_ID, exchange, USE_BODY, NOT_OPTIONAL);
- }
- bulkClient.getQueryResultIds(jobId, batchId, new BulkApiClient.QueryResultIdsCallback() {
- @Override
- public void onResponse(List<String> ids, SalesforceException ex) {
- processResponse(exchange, ids, ex, callback);
- }
- });
-
+ processGetQueryResultIds(exchange, callback);
break;
-
case GET_QUERY_RESULT:
- batchBody = exchange.getIn().getBody(BatchInfo.class);
- String resultId;
- if (batchBody != null) {
- jobId = batchBody.getJobId();
- batchId = batchBody.getId();
- resultId = getParameter(RESULT_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
- } else {
- jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
- batchId = getParameter(BATCH_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
- resultId = getParameter(RESULT_ID, exchange, USE_BODY, NOT_OPTIONAL);
- }
- bulkClient.getQueryResult(jobId, batchId, resultId, new BulkApiClient.StreamResponseCallback() {
- @Override
- public void onResponse(InputStream inputStream, SalesforceException ex) {
- StreamCache body = null;
- if (inputStream != null) {
- // read the result stream into a StreamCache temp file
- // ensures the connection is read
- try {
- body = StreamCacheConverter.convertToStreamCache(inputStream, exchange);
- } catch (IOException e) {
- String msg = "Error retrieving query result: " + e.getMessage();
- ex = new SalesforceException(msg, e);
- } finally {
- // close the input stream to release the Http connection
- try {
- inputStream.close();
- } catch (IOException e) {
- // ignore
- }
- }
- }
- processResponse(exchange, body, ex, callback);
- }
- });
-
+ processGetQueryResult(exchange, callback);
break;
-
default:
throw new SalesforceException("Unknow operation name: " + operationName, null);
-
}
} catch (SalesforceException e) {
exchange.setException(new SalesforceException(
@@ -359,6 +123,289 @@ public class BulkApiProcessor extends AbstractSalesforceProcessor {
return done;
}
+ private void processCreateJob(final Exchange exchange, final AsyncCallback callback) throws InvalidPayloadException {
+ JobInfo jobBody = exchange.getIn().getMandatoryBody(JobInfo.class);
+ bulkClient.createJob(jobBody, new BulkApiClient.JobInfoResponseCallback() {
+ @Override
+ public void onResponse(JobInfo jobInfo, SalesforceException ex) {
+ processResponse(exchange, jobInfo, ex, callback);
+ }
+ });
+ }
+
+ private void processGetJob(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+ JobInfo jobBody;
+ jobBody = exchange.getIn().getBody(JobInfo.class);
+ String jobId;
+ if (jobBody != null) {
+ jobId = jobBody.getId();
+ } else {
+ jobId = getParameter(JOB_ID, exchange, USE_BODY, NOT_OPTIONAL);
+ }
+ bulkClient.getJob(jobId, new BulkApiClient.JobInfoResponseCallback() {
+ @Override
+ public void onResponse(JobInfo jobInfo, SalesforceException ex) {
+ processResponse(exchange, jobInfo, ex, callback);
+ }
+ });
+ }
+
+ private void processCloseJob(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+ JobInfo jobBody;
+ String jobId;
+ jobBody = exchange.getIn().getBody(JobInfo.class);
+ if (jobBody != null) {
+ jobId = jobBody.getId();
+ } else {
+ jobId = getParameter(JOB_ID, exchange, USE_BODY, NOT_OPTIONAL);
+ }
+ bulkClient.closeJob(jobId, new BulkApiClient.JobInfoResponseCallback() {
+ @Override
+ public void onResponse(JobInfo jobInfo, SalesforceException ex) {
+ processResponse(exchange, jobInfo, ex, callback);
+ }
+ });
+ }
+
+ private void processAbortJob(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+ JobInfo jobBody;
+ String jobId;
+ jobBody = exchange.getIn().getBody(JobInfo.class);
+ if (jobBody != null) {
+ jobId = jobBody.getId();
+ } else {
+ jobId = getParameter(JOB_ID, exchange, USE_BODY, NOT_OPTIONAL);
+ }
+ bulkClient.abortJob(jobId, new BulkApiClient.JobInfoResponseCallback() {
+ @Override
+ public void onResponse(JobInfo jobInfo, SalesforceException ex) {
+ processResponse(exchange, jobInfo, ex, callback);
+ }
+ });
+ }
+
+ private void processCreateBatch(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+ String jobId;
+ // since request is in the body, use headers or endpoint params
+ ContentType contentType = ContentType.fromValue(
+ getParameter(CONTENT_TYPE, exchange, IGNORE_BODY, NOT_OPTIONAL));
+ jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+
+ InputStream request;
+ try {
+ request = exchange.getIn().getMandatoryBody(InputStream.class);
+ } catch (CamelException e) {
+ String msg = "Error preparing batch request: " + e.getMessage();
+ throw new SalesforceException(msg, e);
+ }
+
+ bulkClient.createBatch(request, jobId, contentType, new BulkApiClient.BatchInfoResponseCallback() {
+ @Override
+ public void onResponse(BatchInfo batchInfo, SalesforceException ex) {
+ processResponse(exchange, batchInfo, ex, callback);
+ }
+ });
+ }
+
+ private void processGetBatch(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+ String jobId;
+ BatchInfo batchBody = exchange.getIn().getBody(BatchInfo.class);
+ String batchId;
+ if (batchBody != null) {
+ jobId = batchBody.getJobId();
+ batchId = batchBody.getId();
+ } else {
+ jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ batchId = getParameter(BATCH_ID, exchange, USE_BODY, NOT_OPTIONAL);
+ }
+ bulkClient.getBatch(jobId, batchId, new BulkApiClient.BatchInfoResponseCallback() {
+ @Override
+ public void onResponse(BatchInfo batchInfo, SalesforceException ex) {
+ processResponse(exchange, batchInfo, ex, callback);
+ }
+ });
+ }
+
+ private void processGetAllBatches(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+ JobInfo jobBody;
+ String jobId;
+ jobBody = exchange.getIn().getBody(JobInfo.class);
+ if (jobBody != null) {
+ jobId = jobBody.getId();
+ } else {
+ jobId = getParameter(JOB_ID, exchange, USE_BODY, NOT_OPTIONAL);
+ }
+ bulkClient.getAllBatches(jobId, new BulkApiClient.BatchInfoListResponseCallback() {
+ @Override
+ public void onResponse(List<BatchInfo> batchInfoList, SalesforceException ex) {
+ processResponse(exchange, batchInfoList, ex, callback);
+ }
+ });
+ }
+
+ private void processGetRequest(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+ String jobId;
+ BatchInfo batchBody;
+ String batchId;
+ batchBody = exchange.getIn().getBody(BatchInfo.class);
+ if (batchBody != null) {
+ jobId = batchBody.getJobId();
+ batchId = batchBody.getId();
+ } else {
+ jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ batchId = getParameter(BATCH_ID, exchange, USE_BODY, NOT_OPTIONAL);
+ }
+
+ bulkClient.getRequest(jobId, batchId, new BulkApiClient.StreamResponseCallback() {
+ @Override
+ public void onResponse(InputStream inputStream, SalesforceException ex) {
+ // read the request stream into a StreamCache temp file
+ // ensures the connection is read
+ StreamCache body = null;
+ if (inputStream != null) {
+ try {
+ body = StreamCacheConverter.convertToStreamCache(inputStream, exchange);
+ } catch (IOException e) {
+ String msg = "Error retrieving batch request: " + e.getMessage();
+ ex = new SalesforceException(msg, e);
+ } finally {
+ // close the input stream to release the Http connection
+ try {
+ inputStream.close();
+ } catch (IOException ignore) {
+ }
+ }
+ }
+ processResponse(exchange, body, ex, callback);
+ }
+ });
+ }
+
+ private void processGetResults(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+ String jobId;
+ BatchInfo batchBody;
+ String batchId;
+ batchBody = exchange.getIn().getBody(BatchInfo.class);
+ if (batchBody != null) {
+ jobId = batchBody.getJobId();
+ batchId = batchBody.getId();
+ } else {
+ jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ batchId = getParameter(BATCH_ID, exchange, USE_BODY, NOT_OPTIONAL);
+ }
+ bulkClient.getResults(jobId, batchId, new BulkApiClient.StreamResponseCallback() {
+ @Override
+ public void onResponse(InputStream inputStream, SalesforceException ex) {
+ // read the result stream into a StreamCache temp file
+ // ensures the connection is read
+ StreamCache body = null;
+ if (inputStream != null) {
+ try {
+ body = StreamCacheConverter.convertToStreamCache(inputStream, exchange);
+ } catch (IOException e) {
+ String msg = "Error retrieving batch results: " + e.getMessage();
+ ex = new SalesforceException(msg, e);
+ } finally {
+ // close the input stream to release the Http connection
+ try {
+ inputStream.close();
+ } catch (IOException ignore) {
+ }
+ }
+ }
+ processResponse(exchange, body, ex, callback);
+ }
+ });
+ }
+
+ private void processCreateBatchQuery(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+ JobInfo jobBody;
+ String jobId;
+ ContentType contentType;
+ jobBody = exchange.getIn().getBody(JobInfo.class);
+ String soqlQuery;
+ if (jobBody != null) {
+ jobId = jobBody.getId();
+ contentType = jobBody.getContentType();
+ // use SOQL query from header or endpoint config
+ soqlQuery = getParameter(SOBJECT_QUERY, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ } else {
+ jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ contentType = ContentType.fromValue(
+ getParameter(CONTENT_TYPE, exchange, IGNORE_BODY, NOT_OPTIONAL));
+ // reuse SOBJECT_QUERY property
+ soqlQuery = getParameter(SOBJECT_QUERY, exchange, USE_BODY, NOT_OPTIONAL);
+ }
+ bulkClient.createBatchQuery(jobId, soqlQuery, contentType,
+ new BulkApiClient.BatchInfoResponseCallback() {
+ @Override
+ public void onResponse(BatchInfo batchInfo, SalesforceException ex) {
+ processResponse(exchange, batchInfo, ex, callback);
+ }
+ });
+ }
+
+ private void processGetQueryResultIds(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+ String jobId;
+ BatchInfo batchBody;
+ String batchId;
+ batchBody = exchange.getIn().getBody(BatchInfo.class);
+ if (batchBody != null) {
+ jobId = batchBody.getJobId();
+ batchId = batchBody.getId();
+ } else {
+ jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ batchId = getParameter(BATCH_ID, exchange, USE_BODY, NOT_OPTIONAL);
+ }
+ bulkClient.getQueryResultIds(jobId, batchId, new BulkApiClient.QueryResultIdsCallback() {
+ @Override
+ public void onResponse(List<String> ids, SalesforceException ex) {
+ processResponse(exchange, ids, ex, callback);
+ }
+ });
+ }
+
+ private void processGetQueryResult(final Exchange exchange, final AsyncCallback callback) throws SalesforceException {
+ String jobId;
+ BatchInfo batchBody;
+ String batchId;
+ batchBody = exchange.getIn().getBody(BatchInfo.class);
+ String resultId;
+ if (batchBody != null) {
+ jobId = batchBody.getJobId();
+ batchId = batchBody.getId();
+ resultId = getParameter(RESULT_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ } else {
+ jobId = getParameter(JOB_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ batchId = getParameter(BATCH_ID, exchange, IGNORE_BODY, NOT_OPTIONAL);
+ resultId = getParameter(RESULT_ID, exchange, USE_BODY, NOT_OPTIONAL);
+ }
+ bulkClient.getQueryResult(jobId, batchId, resultId, new BulkApiClient.StreamResponseCallback() {
+ @Override
+ public void onResponse(InputStream inputStream, SalesforceException ex) {
+ StreamCache body = null;
+ if (inputStream != null) {
+ // read the result stream into a StreamCache temp file
+ // ensures the connection is read
+ try {
+ body = StreamCacheConverter.convertToStreamCache(inputStream, exchange);
+ } catch (IOException e) {
+ String msg = "Error retrieving query result: " + e.getMessage();
+ ex = new SalesforceException(msg, e);
+ } finally {
+ // close the input stream to release the Http connection
+ try {
+ inputStream.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ }
+ processResponse(exchange, body, ex, callback);
+ }
+ });
+ }
+
private void processResponse(Exchange exchange, Object body, SalesforceException ex, AsyncCallback callback) {
final Message out = exchange.getOut();
if (ex != null) {