You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/06/05 17:12:52 UTC

[08/11] CAMEL-6428: camel-salesforce component. Thanks to Dhiraj Bokde for the contribution.

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java
new file mode 100644
index 0000000..a16a2a3
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal;
+
+import org.apache.camel.Service;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.eclipse.jetty.client.ContentExchange;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.HttpExchange;
+import org.eclipse.jetty.http.HttpMethods;
+import org.eclipse.jetty.http.HttpStatus;
+import org.eclipse.jetty.io.Buffer;
+import org.eclipse.jetty.io.ByteArrayBuffer;
+import org.eclipse.jetty.util.StringUtil;
+import org.eclipse.jetty.util.UrlEncoded;
+import org.apache.camel.component.salesforce.SalesforceLoginConfig;
+import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.component.salesforce.api.dto.RestError;
+import org.apache.camel.component.salesforce.internal.dto.LoginError;
+import org.apache.camel.component.salesforce.internal.dto.LoginToken;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+public class SalesforceSession implements Service {
+
+    private static final String OAUTH2_REVOKE_PATH = "/services/oauth2/revoke?token=";
+    private static final String OAUTH2_TOKEN_PATH = "/services/oauth2/token";
+
+    private static final Logger LOG = LoggerFactory.getLogger(SalesforceSession.class);
+    private static final String FORM_CONTENT_TYPE = "application/x-www-form-urlencoded;charset=utf-8";
+
+    private final HttpClient httpClient;
+
+    private final SalesforceLoginConfig config;
+
+    private final ObjectMapper objectMapper;
+    private final Set<SalesforceSessionListener> listeners;
+
+    private String accessToken;
+    private String instanceUrl;
+
+    public SalesforceSession(HttpClient httpClient, SalesforceLoginConfig config) {
+        // validate parameters
+        assertNotNull("Null httpClient", httpClient);
+        assertNotNull("Null SalesforceLoginConfig", config);
+        assertNotNull("Null loginUrl", config.getLoginUrl());
+        assertNotNull("Null clientId", config.getClientId());
+        assertNotNull("Null clientSecret", config.getClientSecret());
+        assertNotNull("Null userName", config.getUserName());
+        assertNotNull("Null password", config.getPassword());
+
+        this.httpClient = httpClient;
+        this.config = config;
+
+        // strip trailing '/'
+        String loginUrl = config.getLoginUrl();
+        config.setLoginUrl(loginUrl.endsWith("/") ? loginUrl.substring(0, loginUrl.length() - 1) : loginUrl);
+
+        this.objectMapper = new ObjectMapper();
+        this.listeners = new CopyOnWriteArraySet<SalesforceSessionListener>();
+    }
+
+    private void assertNotNull(String s, Object o) {
+        if (o == null) {
+            throw new IllegalArgumentException(s);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public synchronized String login(String oldToken) throws SalesforceException {
+
+        // check if we need a new session
+        // this way there's always a single valid session
+        if ((accessToken == null) || accessToken.equals(oldToken)) {
+
+            // try revoking the old access token before creating a new one
+            accessToken = oldToken;
+            if (accessToken != null) {
+                try {
+                    logout();
+                } catch (SalesforceException e) {
+                    LOG.warn("Error revoking old access token: " + e.getMessage(), e);
+                }
+                accessToken = null;
+            }
+
+            // login to Salesforce and get session id
+            final StatusExceptionExchange loginPost = new StatusExceptionExchange(true);
+            loginPost.setURL(config.getLoginUrl() + OAUTH2_TOKEN_PATH);
+            loginPost.setMethod(HttpMethods.POST);
+            loginPost.setRequestContentType(FORM_CONTENT_TYPE);
+
+            final UrlEncoded nvps = new UrlEncoded();
+            nvps.put("grant_type", "password");
+            nvps.put("client_id", config.getClientId());
+            nvps.put("client_secret", config.getClientSecret());
+            nvps.put("username", config.getUserName());
+            nvps.put("password", config.getPassword());
+            nvps.put("format", "json");
+
+            try {
+
+                // set form content
+                loginPost.setRequestContent(new ByteArrayBuffer(
+                    nvps.encode(StringUtil.__UTF8, true).getBytes(StringUtil.__UTF8)));
+                httpClient.send(loginPost);
+
+                // wait for the login to finish
+                final int exchangeState = loginPost.waitForDone();
+
+                switch (exchangeState) {
+                    case HttpExchange.STATUS_COMPLETED:
+                        final byte[] responseContent = loginPost.getResponseContentBytes();
+                        final int responseStatus = loginPost.getResponseStatus();
+                        switch (responseStatus) {
+
+                            case HttpStatus.OK_200:
+                                // parse the response to get token
+                                LoginToken token = objectMapper.readValue(responseContent,
+                                    LoginToken.class);
+
+                                // don't log token or instance URL for security reasons
+                                LOG.info("Login successful");
+                                accessToken = token.getAccessToken();
+                                instanceUrl = token.getInstanceUrl();
+
+                                // notify all listeners
+                                for (SalesforceSessionListener listener : listeners) {
+                                    try {
+                                        listener.onLogin(accessToken, instanceUrl);
+                                    } catch (Throwable t) {
+                                        LOG.warn("Unexpected error from listener {}: {}", listener, t.getMessage());
+                                    }
+                                }
+
+                                break;
+
+                            case HttpStatus.BAD_REQUEST_400:
+                                // parse the response to get error
+                                final LoginError error = objectMapper.readValue(responseContent,
+                                    LoginError.class);
+                                final String msg = String.format("Login error code:[%s] description:[%s]",
+                                    error.getError(), error.getErrorDescription());
+                                final List<RestError> errors = new ArrayList<RestError>();
+                                errors.add(new RestError(msg, error.getErrorDescription()));
+                                throw new SalesforceException(errors, HttpStatus.BAD_REQUEST_400);
+
+                            default:
+                                throw new SalesforceException(
+                                    String.format("Login error status:[%s] reason:[%s]",
+                                        responseStatus, loginPost.getReason()),
+                                    responseStatus);
+                        }
+                        break;
+
+                    case HttpExchange.STATUS_EXCEPTED:
+                        final Throwable ex = loginPost.getException();
+                        throw new SalesforceException(
+                            String.format("Unexpected login exception: %s", ex.getMessage()),
+                            ex);
+
+                    case HttpExchange.STATUS_CANCELLED:
+                        throw new SalesforceException("Login request CANCELLED!", null);
+
+                    case HttpExchange.STATUS_EXPIRED:
+                        throw new SalesforceException("Login request TIMEOUT!", null);
+
+                }
+
+            } catch (IOException e) {
+                String msg = "Login error: unexpected exception " + e.getMessage();
+                throw new SalesforceException(msg, e);
+            } catch (InterruptedException e) {
+                String msg = "Login error: unexpected exception " + e.getMessage();
+                throw new SalesforceException(msg, e);
+            }
+        }
+
+        return accessToken;
+    }
+
+    public void logout() throws SalesforceException {
+        if (accessToken == null) {
+            return;
+        }
+
+        StatusExceptionExchange logoutGet = new StatusExceptionExchange(true);
+        logoutGet.setURL(config.getLoginUrl() + OAUTH2_REVOKE_PATH + accessToken);
+        logoutGet.setMethod(HttpMethods.GET);
+
+        try {
+            httpClient.send(logoutGet);
+            final int done = logoutGet.waitForDone();
+            switch (done) {
+
+                case HttpExchange.STATUS_COMPLETED:
+                    final int statusCode = logoutGet.getResponseStatus();
+                    final String reason = logoutGet.getReason();
+
+                    if (statusCode == HttpStatus.OK_200) {
+                        LOG.info("Logout successful");
+                    } else {
+                        throw new SalesforceException(
+                            String.format("Logout error, code: [%s] reason: [%s]",
+                                statusCode, reason),
+                            statusCode);
+                    }
+                    break;
+
+                case HttpExchange.STATUS_EXCEPTED:
+                    final Throwable ex = logoutGet.getException();
+                    throw new SalesforceException("Unexpected logout exception: " + ex.getMessage(), ex);
+
+                case HttpExchange.STATUS_CANCELLED:
+                    throw new SalesforceException("Logout request CANCELLED!", null);
+
+                case HttpExchange.STATUS_EXPIRED:
+                    throw new SalesforceException("Logout request TIMEOUT!", null);
+
+            }
+
+        } catch (SalesforceException e) {
+            throw e;
+        } catch (Exception e) {
+            String msg = "Logout error: " + e.getMessage();
+            throw new SalesforceException(msg, e);
+        } finally {
+            // reset session
+            accessToken = null;
+            instanceUrl = null;
+            // notify all session listeners of the new access token and instance url
+            for (SalesforceSessionListener listener : listeners) {
+                try {
+                    listener.onLogout();
+                } catch (Throwable t) {
+                    LOG.warn("Unexpected error from listener {}: {}", listener, t.getMessage());
+                }
+            }
+        }
+    }
+
+    public String getAccessToken() {
+        return accessToken;
+    }
+
+    public String getInstanceUrl() {
+        return instanceUrl;
+    }
+
+    public boolean addListener(SalesforceSessionListener listener) {
+        return listeners.add(listener);
+    }
+
+    public boolean removeListener(SalesforceSessionListener listener) {
+        return listeners.remove(listener);
+    }
+
+    @Override
+    public void start() throws Exception {
+        // auto-login at start if needed
+        login(accessToken);
+    }
+
+    @Override
+    public void stop() throws Exception {
+        // logout
+        logout();
+    }
+
+    /**
+     * Records status line, and exception from exchange.
+     *
+     * @author dbokde
+     */
+    private static class StatusExceptionExchange extends ContentExchange {
+
+        private String reason;
+        private Throwable exception;
+
+        public StatusExceptionExchange(boolean cacheFields) {
+            super(cacheFields);
+        }
+
+        @Override
+        protected synchronized void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException {
+            // remember reason
+            this.reason = reason.toString(StringUtil.__ISO_8859_1);
+            super.onResponseStatus(version, status, reason);
+        }
+
+        @Override
+        protected void onConnectionFailed(Throwable x) {
+            this.exception = x;
+            super.onConnectionFailed(x);
+        }
+
+        @Override
+        protected void onException(Throwable x) {
+            this.exception = x;
+            super.onException(x);
+        }
+
+        public String getReason() {
+            return reason;
+        }
+
+        public Throwable getException() {
+            return exception;
+        }
+
+    }
+
+    public static interface SalesforceSessionListener {
+        void onLogin(String accessToken, String instanceUrl);
+        void onLogout();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
new file mode 100644
index 0000000..6fe7028
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.client;
+
+import org.apache.camel.Service;
+import org.eclipse.jetty.client.ContentExchange;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.HttpEventListenerWrapper;
+import org.eclipse.jetty.client.HttpExchange;
+import org.eclipse.jetty.http.HttpSchemes;
+import org.eclipse.jetty.http.HttpStatus;
+import org.eclipse.jetty.io.Buffer;
+import org.eclipse.jetty.util.StringUtil;
+import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.component.salesforce.internal.SalesforceSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+public abstract class AbstractClientBase implements SalesforceSession.SalesforceSessionListener, Service {
+
+    protected final Logger LOG = LoggerFactory.getLogger(getClass());
+
+    protected static final String APPLICATION_JSON_UTF8 = "application/json;charset=utf-8";
+    protected static final String APPLICATION_XML_UTF8 = "application/xml;charset=utf-8";
+
+    protected final HttpClient httpClient;
+    protected final SalesforceSession session;
+    protected final String version;
+
+    protected String accessToken;
+    protected String instanceUrl;
+
+    public AbstractClientBase(String version,
+                              SalesforceSession session, HttpClient httpClient) throws SalesforceException {
+
+        this.version = version;
+        this.session = session;
+        this.httpClient = httpClient;
+    }
+
+    public void start() throws Exception {
+        // local cache
+        accessToken = session.getAccessToken();
+        if (accessToken == null) {
+            // lazy login here!
+            accessToken = session.login(accessToken);
+        }
+        instanceUrl = session.getInstanceUrl();
+
+        // also register this client as a session listener
+        session.addListener(this);
+    }
+
+    @Override
+    public void stop() throws Exception {
+        // deregister listener
+        session.removeListener(this);
+    }
+
+    @Override
+    public void onLogin(String accessToken, String instanceUrl) {
+        if (!accessToken.equals(this.accessToken)) {
+            this.accessToken = accessToken;
+            this.instanceUrl = instanceUrl;
+        }
+    }
+
+    @Override
+    public void onLogout() {
+        // ignore, if this client makes another request with stale token,
+        // SalesforceSecurityListener will auto login!
+    }
+
+    protected SalesforceExchange getContentExchange(String method, String url) {
+        SalesforceExchange get = new SalesforceExchange();
+        get.setMethod(method);
+        get.setURL(url);
+        get.setClient(this);
+        return get;
+    }
+
+    protected interface ClientResponseCallback {
+        void onResponse(InputStream response, SalesforceException ex);
+    }
+
+    protected void doHttpRequest(final ContentExchange request, final ClientResponseCallback callback) {
+
+        // use SalesforceSecurityListener for security login retries
+        try {
+            final boolean isHttps = HttpSchemes.HTTPS.equals(String.valueOf(request.getScheme()));
+            request.setEventListener(new SalesforceSecurityListener(
+                httpClient.getDestination(request.getAddress(), isHttps),
+                request, session, accessToken));
+        } catch (IOException e) {
+            // propagate exception
+            callback.onResponse(null, new SalesforceException(
+                String.format("Error registering security listener: %s", e.getMessage()),
+                e));
+        }
+
+        // use HttpEventListener for lifecycle events
+        request.setEventListener(new HttpEventListenerWrapper(request.getEventListener(), true) {
+
+            public String reason;
+
+            @Override
+            public void onConnectionFailed(Throwable ex) {
+                super.onConnectionFailed(ex);
+                callback.onResponse(null,
+                    new SalesforceException("Connection error: " + ex.getMessage(), ex));
+            }
+
+            @Override
+            public void onException(Throwable ex) {
+                super.onException(ex);
+                callback.onResponse(null,
+                    new SalesforceException("Unexpected exception: " + ex.getMessage(), ex));
+            }
+
+            @Override
+            public void onExpire() {
+                super.onExpire();
+                callback.onResponse(null,
+                    new SalesforceException("Request expired", null));
+            }
+
+            @Override
+            public void onResponseComplete() throws IOException {
+                super.onResponseComplete();
+
+                final int responseStatus = request.getResponseStatus();
+                if (responseStatus < HttpStatus.OK_200 || responseStatus >= HttpStatus.MULTIPLE_CHOICES_300) {
+                    final String msg = String.format("Error {%s:%s} executing {%s:%s}",
+                        responseStatus, reason, request.getMethod(), request.getRequestURI());
+                    final SalesforceException exception = new SalesforceException(msg, createRestException(request));
+                    exception.setStatusCode(responseStatus);
+                    callback.onResponse(null, exception);
+                } else {
+                    // TODO not memory efficient for large response messages,
+                    // doesn't seem to be possible in Jetty 7 to directly stream to response parsers
+                    final byte[] bytes = request.getResponseContentBytes();
+                    callback.onResponse(bytes != null ? new ByteArrayInputStream(bytes) : null, null);
+                }
+
+            }
+
+            @Override
+            public void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException {
+                super.onResponseStatus(version, status, reason);
+                // remember status reason
+                this.reason = reason.toString(StringUtil.__ISO_8859_1);
+            }
+        });
+
+        // execute the request
+        try {
+            httpClient.send(request);
+        } catch (IOException e) {
+            String msg = "Unexpected Error: " + e.getMessage();
+            // send error through callback
+            callback.onResponse(null, new SalesforceException(msg, e));
+        }
+
+    }
+
+    public void setAccessToken(String accessToken) {
+        this.accessToken = accessToken;
+    }
+
+    public void setInstanceUrl(String instanceUrl) {
+        this.instanceUrl = instanceUrl;
+    }
+
+    protected abstract void setAccessToken(HttpExchange httpExchange);
+
+    protected abstract SalesforceException createRestException(ContentExchange httpExchange);
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/BulkApiClient.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/BulkApiClient.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/BulkApiClient.java
new file mode 100644
index 0000000..b00e3fd
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/BulkApiClient.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.client;
+
+import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.component.salesforce.api.dto.bulk.*;
+
+import java.io.InputStream;
+import java.util.List;
+
+/**
+ * Client interface for Salesforce Bulk API
+ */
+public interface BulkApiClient {
+
+    public static interface JobInfoResponseCallback {
+        void onResponse(JobInfo jobInfo, SalesforceException ex);
+    }
+
+    public static interface BatchInfoResponseCallback {
+        void onResponse(BatchInfo batchInfo, SalesforceException ex);
+    }
+
+    public static interface BatchInfoListResponseCallback {
+        void onResponse(List<BatchInfo> batchInfoList, SalesforceException ex);
+    }
+
+    public static interface StreamResponseCallback {
+        void onResponse(InputStream inputStream, SalesforceException ex);
+    }
+
+    public static interface QueryResultIdsCallback {
+        void onResponse(List<String> ids, SalesforceException ex);
+    }
+
+    /**
+     * Creates a Bulk Job
+     *
+     * @param jobInfo {@link JobInfo} with required fields
+     * @param callback {@link JobInfoResponseCallback} to be invoked on response or error
+     */
+    void createJob(JobInfo jobInfo,
+                   JobInfoResponseCallback callback);
+
+    void getJob(String jobId,
+                JobInfoResponseCallback callback);
+
+    void closeJob(String jobId,
+                  JobInfoResponseCallback callback);
+
+    void abortJob(String jobId,
+                  JobInfoResponseCallback callback);
+
+    void createBatch(InputStream batchStream, String jobId, ContentType contentTypeEnum,
+                     BatchInfoResponseCallback callback);
+
+    void getBatch(String jobId, String batchId,
+                  BatchInfoResponseCallback callback);
+
+    void getAllBatches(String jobId,
+                       BatchInfoListResponseCallback callback);
+
+    void getRequest(String jobId, String batchId,
+                    StreamResponseCallback callback);
+
+    void getResults(String jobId, String batchId,
+                    StreamResponseCallback callback);
+
+    void createBatchQuery(String jobId, String soqlQuery, ContentType jobContentType,
+                          BatchInfoResponseCallback callback);
+
+    void getQueryResultIds(String jobId, String batchId,
+                           QueryResultIdsCallback callback);
+
+    void getQueryResult(String jobId, String batchId, String resultId,
+                        StreamResponseCallback callback);
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java
new file mode 100644
index 0000000..b4899e5
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java
@@ -0,0 +1,481 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.client;
+
+import org.eclipse.jetty.client.ContentExchange;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.HttpExchange;
+import org.eclipse.jetty.http.HttpHeaders;
+import org.eclipse.jetty.http.HttpMethods;
+import org.eclipse.jetty.io.ByteArrayBuffer;
+import org.eclipse.jetty.util.StringUtil;
+import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.component.salesforce.api.dto.RestError;
+import org.apache.camel.component.salesforce.api.dto.bulk.*;
+import org.apache.camel.component.salesforce.api.dto.bulk.Error;
+import org.apache.camel.component.salesforce.internal.SalesforceSession;
+
+import javax.xml.bind.*;
+import javax.xml.transform.stream.StreamSource;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+
+public class DefaultBulkApiClient extends AbstractClientBase implements BulkApiClient {
+
+    private static final String TOKEN_HEADER = "X-SFDC-Session";
+
+    private JAXBContext context;
+    private static final ContentType DEFAULT_ACCEPT_TYPE = ContentType.XML;
+    private ObjectFactory objectFactory;
+
+    public DefaultBulkApiClient(String version,
+                                SalesforceSession session, HttpClient httpClient) throws SalesforceException {
+        super(version, session, httpClient);
+
+        try {
+            context = JAXBContext.newInstance(JobInfo.class.getPackage().getName(), getClass().getClassLoader());
+        } catch (JAXBException e) {
+            String msg = "Error loading Bulk API DTOs: " + e.getMessage();
+            throw new IllegalArgumentException(msg, e);
+        }
+
+        this.objectFactory = new ObjectFactory();
+    }
+
+    @Override
+    public void createJob(JobInfo request, final JobInfoResponseCallback callback) {
+
+        // clear system fields if set
+        sanitizeJobRequest(request);
+
+        final ContentExchange post = getContentExchange(HttpMethods.POST, jobUrl(null));
+        try {
+            marshalRequest(objectFactory.createJobInfo(request), post, APPLICATION_XML_UTF8);
+        } catch (SalesforceException e) {
+            callback.onResponse(null, e);
+            return;
+        }
+
+        // make the call and parse the result in callback
+        doHttpRequest(post, new ClientResponseCallback() {
+            @Override
+            public void onResponse(InputStream response, SalesforceException ex) {
+                JobInfo value = null;
+                if (response != null) {
+                    try {
+                        value = unmarshalResponse(response, post, JobInfo.class);
+                    } catch (SalesforceException e) {
+                        ex = e;
+                    }
+                }
+                callback.onResponse(value, ex);
+            }
+        });
+
+    }
+
+    // reset read only fields
+    private void sanitizeJobRequest(JobInfo request) {
+        request.setApexProcessingTime(null);
+        request.setApiActiveProcessingTime(null);
+        request.setApiVersion(null);
+        request.setCreatedById(null);
+        request.setCreatedDate(null);
+        request.setId(null);
+        request.setNumberBatchesCompleted(null);
+        request.setNumberBatchesFailed(null);
+        request.setNumberBatchesInProgress(null);
+        request.setNumberBatchesQueued(null);
+        request.setNumberBatchesTotal(null);
+        request.setNumberRecordsFailed(null);
+        request.setNumberRecordsProcessed(null);
+        request.setNumberRetries(null);
+        request.setState(null);
+        request.setSystemModstamp(null);
+        request.setSystemModstamp(null);
+    }
+
+    @Override
+    public void getJob(String jobId, final JobInfoResponseCallback callback) {
+
+        final ContentExchange get = getContentExchange(HttpMethods.GET, jobUrl(jobId));
+
+        // make the call and parse the result
+        doHttpRequest(get, new ClientResponseCallback() {
+            @Override
+            public void onResponse(InputStream response, SalesforceException ex) {
+                JobInfo value = null;
+                try {
+                    value = unmarshalResponse(response, get, JobInfo.class);
+                } catch (SalesforceException e) {
+                    ex = e;
+                }
+                callback.onResponse(value, ex);
+            }
+        });
+
+    }
+
+    @Override
+    public void closeJob(String jobId, final JobInfoResponseCallback callback) {
+        final JobInfo request = new JobInfo();
+        request.setState(JobStateEnum.CLOSED);
+
+        final ContentExchange post = getContentExchange(HttpMethods.POST, jobUrl(jobId));
+        try {
+            marshalRequest(objectFactory.createJobInfo(request), post, APPLICATION_XML_UTF8);
+        } catch (SalesforceException e) {
+            callback.onResponse(null, e);
+            return;
+        }
+
+        // make the call and parse the result
+        doHttpRequest(post, new ClientResponseCallback() {
+            @Override
+            public void onResponse(InputStream response, SalesforceException ex) {
+                JobInfo value = null;
+                try {
+                    value = unmarshalResponse(response, post, JobInfo.class);
+                } catch (SalesforceException e) {
+                    ex = e;
+                }
+                callback.onResponse(value, ex);
+            }
+        });
+
+    }
+
+    @Override
+    public void abortJob(String jobId, final JobInfoResponseCallback callback) {
+        final JobInfo request = new JobInfo();
+        request.setState(JobStateEnum.ABORTED);
+
+        final ContentExchange post = getContentExchange(HttpMethods.POST, jobUrl(jobId));
+        try {
+            marshalRequest(objectFactory.createJobInfo(request), post, APPLICATION_XML_UTF8);
+        } catch (SalesforceException e) {
+            callback.onResponse(null, e);
+            return;
+        }
+
+        // make the call and parse the result
+        doHttpRequest(post, new ClientResponseCallback() {
+            @Override
+            public void onResponse(InputStream response, SalesforceException ex) {
+                JobInfo value = null;
+                try {
+                    value = unmarshalResponse(response, post, JobInfo.class);
+                } catch (SalesforceException e) {
+                    ex = e;
+                }
+                callback.onResponse(value, ex);
+            }
+        });
+
+    }
+
+    @Override
+    public void createBatch(InputStream batchStream, String jobId, ContentType contentTypeEnum,
+                            final BatchInfoResponseCallback callback) {
+
+        final ContentExchange post = getContentExchange(HttpMethods.POST, batchUrl(jobId, null));
+        post.setRequestContentSource(batchStream);
+        post.setRequestContentType(getContentType(contentTypeEnum) + ";charset=" + StringUtil.__UTF8);
+
+        // make the call and parse the result
+        doHttpRequest(post, new ClientResponseCallback() {
+            @Override
+            public void onResponse(InputStream response, SalesforceException ex) {
+                BatchInfo value = null;
+                try {
+                    value = unmarshalResponse(response, post, BatchInfo.class);
+                } catch (SalesforceException e) {
+                    ex = e;
+                }
+                callback.onResponse(value, ex);
+            }
+        });
+
+    }
+
+    @Override
+    public void getBatch(String jobId, String batchId,
+                         final BatchInfoResponseCallback callback) {
+
+        final ContentExchange get = getContentExchange(HttpMethods.GET, batchUrl(jobId, batchId));
+
+        // make the call and parse the result
+        doHttpRequest(get, new ClientResponseCallback() {
+            @Override
+            public void onResponse(InputStream response, SalesforceException ex) {
+                BatchInfo value = null;
+                try {
+                    value = unmarshalResponse(response, get, BatchInfo.class);
+                } catch (SalesforceException e) {
+                    ex = e;
+                }
+                callback.onResponse(value, ex);
+            }
+        });
+
+    }
+
+    @Override
+    public void getAllBatches(String jobId,
+                              final BatchInfoListResponseCallback callback) {
+
+        final ContentExchange get = getContentExchange(HttpMethods.GET, batchUrl(jobId, null));
+
+        // make the call and parse the result
+        doHttpRequest(get, new ClientResponseCallback() {
+            @Override
+            public void onResponse(InputStream response, SalesforceException ex) {
+                BatchInfoList value = null;
+                try {
+                    value = unmarshalResponse(response, get, BatchInfoList.class);
+                } catch (SalesforceException e) {
+                    ex = e;
+                }
+                callback.onResponse(value != null ? value.getBatchInfo() : null, ex);
+            }
+        });
+
+    }
+
+    @Override
+    public void getRequest(String jobId, String batchId,
+                           final StreamResponseCallback callback) {
+
+        final ContentExchange get = getContentExchange(HttpMethods.GET, batchUrl(jobId, batchId));
+
+        // make the call and parse the result
+        doHttpRequest(get, new ClientResponseCallback() {
+            @Override
+            public void onResponse(InputStream response, SalesforceException ex) {
+                callback.onResponse(response, ex);
+            }
+        });
+
+    }
+
+    @Override
+    public void getResults(String jobId, String batchId,
+                           final StreamResponseCallback callback) {
+        final ContentExchange get = getContentExchange(HttpMethods.GET, batchResultUrl(jobId, batchId, null));
+
+        // make the call and return the result
+        doHttpRequest(get, new ClientResponseCallback() {
+            @Override
+            public void onResponse(InputStream response, SalesforceException ex) {
+                callback.onResponse(response, ex);
+            }
+        });
+    }
+
+    @Override
+    public void createBatchQuery(String jobId, String soqlQuery, ContentType jobContentType,
+                                 final BatchInfoResponseCallback callback) {
+
+        final ContentExchange post = getContentExchange(HttpMethods.POST, batchUrl(jobId, null));
+        byte[] queryBytes = soqlQuery.getBytes(StringUtil.__UTF8_CHARSET);
+        post.setRequestContent(new ByteArrayBuffer(queryBytes));
+        post.setRequestContentType(getContentType(jobContentType) + ";charset=" + StringUtil.__UTF8);
+
+        // make the call and parse the result
+        doHttpRequest(post, new ClientResponseCallback() {
+            @Override
+            public void onResponse(InputStream response, SalesforceException ex) {
+                BatchInfo value = null;
+                try {
+                    value = unmarshalResponse(response, post, BatchInfo.class);
+                } catch (SalesforceException e) {
+                    ex = e;
+                }
+                callback.onResponse(value, ex);
+            }
+        });
+
+    }
+
+    @Override
+    public void getQueryResultIds(String jobId, String batchId,
+                                  final QueryResultIdsCallback callback) {
+        final ContentExchange get = getContentExchange(HttpMethods.GET, batchResultUrl(jobId, batchId, null));
+
+        // make the call and parse the result
+        doHttpRequest(get, new ClientResponseCallback() {
+            @Override
+            public void onResponse(InputStream response, SalesforceException ex) {
+                QueryResultList value = null;
+                try {
+                    value = unmarshalResponse(response, get, QueryResultList.class);
+                } catch (SalesforceException e) {
+                    ex = e;
+                }
+                callback.onResponse(value != null ? Collections.unmodifiableList(value.getResult()) : null,
+                    ex);
+            }
+        });
+
+    }
+
+    @Override
+    public void getQueryResult(String jobId, String batchId, String resultId,
+                               final StreamResponseCallback callback) {
+        final ContentExchange get = getContentExchange(HttpMethods.GET, batchResultUrl(jobId, batchId, resultId));
+
+        // make the call and parse the result
+        doHttpRequest(get, new ClientResponseCallback() {
+            @Override
+            public void onResponse(InputStream response, SalesforceException ex) {
+                callback.onResponse(response, ex);
+            }
+        });
+
+    }
+
+    @Override
+    protected void setAccessToken(HttpExchange httpExchange) {
+        httpExchange.setRequestHeader(TOKEN_HEADER, accessToken);
+    }
+
+    @Override
+    protected void doHttpRequest(ContentExchange request, ClientResponseCallback callback) {
+        // set access token for all requests
+        setAccessToken(request);
+
+        // set default charset
+        request.setRequestHeader(HttpHeaders.ACCEPT_CHARSET, StringUtil.__UTF8);
+
+        // TODO check if this is really needed or not, since SF response content type seems fixed
+        // check if the default accept content type must be used
+        if (!request.getRequestFields().containsKey(HttpHeaders.ACCEPT)) {
+            final String contentType = getContentType(DEFAULT_ACCEPT_TYPE);
+            request.setRequestHeader(HttpHeaders.ACCEPT, contentType);
+            // request content type and charset is set by the request entity
+        }
+
+        super.doHttpRequest(request, callback);
+    }
+
+    private static String getContentType(ContentType type) {
+        String result = null;
+
+        switch (type) {
+            case CSV:
+                result = "text/csv";
+                break;
+
+            case XML:
+                result = "application/xml";
+                break;
+
+            case ZIP_CSV:
+            case ZIP_XML:
+                result = type.toString().toLowerCase().replace('_', '/');
+                break;
+        }
+
+        return result;
+    }
+
+    @Override
+    protected SalesforceException createRestException(ContentExchange request) {
+        // this must be of type Error
+        try {
+            final Error error = unmarshalResponse(new ByteArrayInputStream(request.getResponseContentBytes()),
+                request, Error.class);
+
+            final RestError restError = new RestError();
+            restError.setErrorCode(error.getExceptionCode());
+            restError.setMessage(error.getExceptionMessage());
+
+            return new SalesforceException(Arrays.asList(restError), request.getResponseStatus());
+        } catch (SalesforceException e) {
+            String msg = "Error un-marshaling Salesforce Error: " + e.getMessage();
+            return new SalesforceException(msg, e);
+        }
+    }
+
+    private <T> T unmarshalResponse(InputStream response, ContentExchange request, Class<T> resultClass)
+        throws SalesforceException {
+        try {
+            Unmarshaller unmarshaller = context.createUnmarshaller();
+            JAXBElement<T> result = unmarshaller.unmarshal(new StreamSource(response), resultClass);
+            return result.getValue();
+        } catch (JAXBException e) {
+            throw new SalesforceException(
+                String.format("Error unmarshaling response {%s:%s} : %s",
+                    request.getMethod(), request.getRequestURI(), e.getMessage()),
+                e);
+        } catch (IllegalArgumentException e) {
+            throw new SalesforceException(
+                String.format("Error unmarshaling response for {%s:%s} : %s",
+                    request.getMethod(), request.getRequestURI(), e.getMessage()),
+                e);
+        }
+    }
+
+    private void marshalRequest(Object input, ContentExchange request, String contentType)
+        throws SalesforceException {
+        try {
+            Marshaller marshaller = context.createMarshaller();
+            ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+            marshaller.marshal(input, byteStream);
+            request.setRequestContent(new ByteArrayBuffer(byteStream.toByteArray()));
+            request.setRequestContentType(contentType);
+        } catch (JAXBException e) {
+            throw new SalesforceException(
+                String.format("Error marshaling request for {%s:%s} : %s",
+                    request.getMethod(), request.getRequestURI(), e.getMessage()),
+                e);
+        } catch (IllegalArgumentException e) {
+            throw new SalesforceException(
+                String.format("Error marshaling request for {%s:%s} : %s",
+                    request.getMethod(), request.getRequestURI(), e.getMessage()),
+                e);
+        }
+    }
+
+    private String jobUrl(String jobId) {
+        if (jobId != null) {
+            return super.instanceUrl + "/services/async/" + version + "/job/" + jobId;
+        } else {
+            return super.instanceUrl + "/services/async/" + version + "/job";
+        }
+    }
+
+    private String batchUrl(String jobId, String batchId) {
+        if (batchId != null) {
+            return jobUrl(jobId) + "/batch/" + batchId;
+        } else {
+            return jobUrl(jobId) + "/batch";
+        }
+    }
+
+    private String batchResultUrl(String jobId, String batchId, String resultId) {
+        if (resultId != null) {
+            return batchUrl(jobId, batchId) + "/result/" + resultId;
+        } else {
+            return batchUrl(jobId, batchId) + "/result";
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultRestClient.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultRestClient.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultRestClient.java
new file mode 100644
index 0000000..07f5af2
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultRestClient.java
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.client;
+
+import com.thoughtworks.xstream.XStream;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+import org.eclipse.jetty.client.ContentExchange;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.HttpExchange;
+import org.eclipse.jetty.http.HttpHeaders;
+import org.eclipse.jetty.http.HttpMethods;
+import org.eclipse.jetty.util.StringUtil;
+import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.component.salesforce.api.dto.RestError;
+import org.apache.camel.component.salesforce.internal.SalesforceSession;
+import org.apache.camel.component.salesforce.internal.dto.RestErrors;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.List;
+
+public class DefaultRestClient extends AbstractClientBase implements RestClient {
+
+    private static final String SERVICES_DATA = "/services/data/";
+    private static final String TOKEN_HEADER = "Authorization";
+    private static final String TOKEN_PREFIX = "Bearer ";
+
+    private ObjectMapper objectMapper;
+    private XStream xStream;
+    protected String format;
+
+    public DefaultRestClient(HttpClient httpClient,
+                             String version, String format, SalesforceSession session) throws SalesforceException {
+        super(version, session, httpClient);
+
+        this.format = format;
+
+        // initialize error parsers for JSON and XML
+        this.objectMapper = new ObjectMapper();
+        this.xStream = new XStream();
+        xStream.processAnnotations(RestErrors.class);
+    }
+
+    @Override
+    protected void doHttpRequest(ContentExchange request, ClientResponseCallback callback) {
+        // set standard headers for all requests
+        final String contentType = "json".equals(format) ? APPLICATION_JSON_UTF8 : APPLICATION_XML_UTF8;
+        request.setRequestHeader(HttpHeaders.ACCEPT, contentType);
+        request.setRequestHeader(HttpHeaders.ACCEPT_CHARSET, StringUtil.__UTF8);
+        // request content type and charset is set by the request entity
+
+        super.doHttpRequest(request, callback);
+    }
+
+    @Override
+    protected SalesforceException createRestException(ContentExchange httpExchange) {
+        // try parsing response according to format
+        try {
+            if ("json".equals(format)) {
+                List<RestError> restErrors = objectMapper.readValue(
+                    httpExchange.getResponseContent(), new TypeReference<List<RestError>>() {
+                });
+                return new SalesforceException(restErrors, httpExchange.getResponseStatus());
+            } else {
+                RestErrors errors = new RestErrors();
+                xStream.fromXML(httpExchange.getResponseContent(), errors);
+                return new SalesforceException(errors.getErrors(), httpExchange.getResponseStatus());
+            }
+        } catch (IOException e) {
+            // log and ignore
+            String msg = "Unexpected Error parsing " + format + " error response: " + e.getMessage();
+            LOG.warn(msg, e);
+        } catch (RuntimeException e) {
+            // log and ignore
+            String msg = "Unexpected Error parsing " + format + " error response: " + e.getMessage();
+            LOG.warn(msg, e);
+        }
+
+        // just report HTTP status info
+        return new SalesforceException("Unexpected error", httpExchange.getResponseStatus());
+    }
+
+    @Override
+    public void getVersions(final ResponseCallback callback) {
+        ContentExchange get = getContentExchange(HttpMethods.GET, servicesDataUrl());
+        // does not require authorization token
+
+        doHttpRequest(get, new DelegatingClientCallback(callback));
+    }
+
+    @Override
+    public void getResources(ResponseCallback callback) {
+        ContentExchange get = getContentExchange(HttpMethods.GET, versionUrl());
+        // requires authorization token
+        setAccessToken(get);
+
+        doHttpRequest(get, new DelegatingClientCallback(callback));
+    }
+
+    @Override
+    public void getGlobalObjects(ResponseCallback callback) {
+        ContentExchange get = getContentExchange(HttpMethods.GET, sobjectsUrl(""));
+        // requires authorization token
+        setAccessToken(get);
+
+        doHttpRequest(get, new DelegatingClientCallback(callback));
+    }
+
+    @Override
+    public void getBasicInfo(String sObjectName,
+                             ResponseCallback callback) {
+        ContentExchange get = getContentExchange(HttpMethods.GET, sobjectsUrl(sObjectName + "/"));
+        // requires authorization token
+        setAccessToken(get);
+
+        doHttpRequest(get, new DelegatingClientCallback(callback));
+    }
+
+    @Override
+    public void getDescription(String sObjectName,
+                               ResponseCallback callback) {
+        ContentExchange get = getContentExchange(HttpMethods.GET, sobjectsUrl(sObjectName + "/describe/"));
+        // requires authorization token
+        setAccessToken(get);
+
+        doHttpRequest(get, new DelegatingClientCallback(callback));
+    }
+
+    @Override
+    public void getSObject(String sObjectName, String id, String[] fields,
+                           ResponseCallback callback) {
+
+        // parse fields if set
+        String params = "";
+        if (fields != null && fields.length > 0) {
+            StringBuilder fieldsValue = new StringBuilder("?fields=");
+            for (int i = 0; i < fields.length; i++) {
+                fieldsValue.append(fields[i]);
+                if (i < (fields.length - 1)) {
+                    fieldsValue.append(',');
+                }
+            }
+            params = fieldsValue.toString();
+        }
+        ContentExchange get = getContentExchange(HttpMethods.GET, sobjectsUrl(sObjectName + "/" + id + params));
+        // requires authorization token
+        setAccessToken(get);
+
+        doHttpRequest(get, new DelegatingClientCallback(callback));
+    }
+
+    @Override
+    public void createSObject(String sObjectName, InputStream sObject,
+                              ResponseCallback callback) {
+        // post the sObject
+        final ContentExchange post = getContentExchange(HttpMethods.POST, sobjectsUrl(sObjectName));
+
+        // authorization
+        setAccessToken(post);
+
+        // input stream as entity content
+        post.setRequestContentSource(sObject);
+        post.setRequestContentType("json".equals(format) ? APPLICATION_JSON_UTF8 : APPLICATION_XML_UTF8);
+
+        doHttpRequest(post, new DelegatingClientCallback(callback));
+    }
+
+    @Override
+    public void updateSObject(String sObjectName, String id, InputStream sObject,
+                              ResponseCallback callback) {
+        final ContentExchange patch = getContentExchange("PATCH", sobjectsUrl(sObjectName + "/" + id));
+        // requires authorization token
+        setAccessToken(patch);
+
+        // input stream as entity content
+        patch.setRequestContentSource(sObject);
+        patch.setRequestContentType("json".equals(format) ? APPLICATION_JSON_UTF8 : APPLICATION_XML_UTF8);
+
+        doHttpRequest(patch, new DelegatingClientCallback(callback));
+    }
+
+    @Override
+    public void deleteSObject(String sObjectName, String id,
+                              ResponseCallback callback) {
+        final ContentExchange delete = getContentExchange(HttpMethods.DELETE, sobjectsUrl(sObjectName + "/" + id));
+
+        // requires authorization token
+        setAccessToken(delete);
+
+        doHttpRequest(delete, new DelegatingClientCallback(callback));
+    }
+
+    @Override
+    public void getSObjectWithId(String sObjectName, String fieldName, String fieldValue,
+                                 ResponseCallback callback) {
+        final ContentExchange get = getContentExchange(HttpMethods.GET,
+            sobjectsExternalIdUrl(sObjectName, fieldName, fieldValue));
+
+        // requires authorization token
+        setAccessToken(get);
+
+        doHttpRequest(get, new DelegatingClientCallback(callback));
+    }
+
+    @Override
+    public void upsertSObject(String sObjectName, String fieldName, String fieldValue, InputStream sObject,
+                              ResponseCallback callback) {
+        final ContentExchange patch = getContentExchange("PATCH",
+            sobjectsExternalIdUrl(sObjectName, fieldName, fieldValue));
+
+        // requires authorization token
+        setAccessToken(patch);
+
+        // input stream as entity content
+        patch.setRequestContentSource(sObject);
+        // TODO will the encoding always be UTF-8??
+        patch.setRequestContentType("json".equals(format) ? APPLICATION_JSON_UTF8 : APPLICATION_XML_UTF8);
+
+        doHttpRequest(patch, new DelegatingClientCallback(callback));
+    }
+
+    @Override
+    public void deleteSObjectWithId(String sObjectName, String fieldName, String fieldValue,
+                                    ResponseCallback callback) {
+        final ContentExchange delete = getContentExchange(HttpMethods.DELETE,
+            sobjectsExternalIdUrl(sObjectName, fieldName, fieldValue));
+
+        // requires authorization token
+        setAccessToken(delete);
+
+        doHttpRequest(delete, new DelegatingClientCallback(callback));
+    }
+
+    @Override
+    public void getBlobField(String sObjectName, String id, String blobFieldName, ResponseCallback callback) {
+        final ContentExchange get = getContentExchange(HttpMethods.GET,
+            sobjectsUrl(sObjectName + "/" + id +"/" + blobFieldName));
+        // TODO this doesn't seem to be required, the response is always the content binary stream
+        //get.setRequestHeader(HttpHeaders.ACCEPT_ENCODING, "base64");
+
+        // requires authorization token
+        setAccessToken(get);
+
+        doHttpRequest(get, new DelegatingClientCallback(callback));
+    }
+
+    @Override
+    public void query(String soqlQuery, ResponseCallback callback) {
+        try {
+
+            String encodedQuery = URLEncoder.encode(soqlQuery, StringUtil.__UTF8_CHARSET.toString());
+            // URLEncoder likes to use '+' for spaces
+            encodedQuery = encodedQuery.replace("+", "%20");
+            final ContentExchange get = getContentExchange(HttpMethods.GET, versionUrl() + "query/?q=" + encodedQuery);
+
+            // requires authorization token
+            setAccessToken(get);
+
+            doHttpRequest(get, new DelegatingClientCallback(callback));
+
+        } catch (UnsupportedEncodingException e) {
+            String msg = "Unexpected error: " + e.getMessage();
+            callback.onResponse(null, new SalesforceException(msg, e));
+        }
+    }
+
+    @Override
+    public void queryMore(String nextRecordsUrl, ResponseCallback callback) {
+        final ContentExchange get = getContentExchange(HttpMethods.GET, instanceUrl + nextRecordsUrl);
+
+        // requires authorization token
+        setAccessToken(get);
+
+        doHttpRequest(get, new DelegatingClientCallback(callback));
+    }
+
+    @Override
+    public void search(String soslQuery, ResponseCallback callback) {
+        try {
+
+            String encodedQuery = URLEncoder.encode(soslQuery, StringUtil.__UTF8_CHARSET.toString());
+            // URLEncoder likes to use '+' for spaces
+            encodedQuery = encodedQuery.replace("+", "%20");
+            final ContentExchange get = getContentExchange(HttpMethods.GET, versionUrl() + "search/?q=" + encodedQuery);
+
+            // requires authorization token
+            setAccessToken(get);
+
+            doHttpRequest(get, new DelegatingClientCallback(callback));
+
+        } catch (UnsupportedEncodingException e) {
+            String msg = "Unexpected error: " + e.getMessage();
+            callback.onResponse(null, new SalesforceException(msg, e));
+        }
+    }
+
+    private String servicesDataUrl() {
+        return instanceUrl + SERVICES_DATA;
+    }
+
+    private String versionUrl() {
+        if (version == null) {
+            throw new IllegalArgumentException("NULL API version", new NullPointerException("version"));
+        }
+        return servicesDataUrl() + "v" + version + "/";
+    }
+
+    private String sobjectsUrl(String sObjectName) {
+        if (sObjectName == null) {
+            throw new IllegalArgumentException("Null SObject name", new NullPointerException("sObjectName"));
+        }
+        return versionUrl() + "sobjects/" + sObjectName;
+    }
+
+    private String sobjectsExternalIdUrl(String sObjectName, String fieldName, String fieldValue) {
+        if (fieldName == null || fieldValue == null) {
+            throw new IllegalArgumentException("External field name and value cannot be NULL");
+        }
+        try {
+            String encodedValue = URLEncoder.encode(fieldValue, StringUtil.__UTF8_CHARSET.toString());
+            // URLEncoder likes to use '+' for spaces
+            encodedValue = encodedValue.replace("+", "%20");
+            return sobjectsUrl(sObjectName + "/" + fieldName + "/" + encodedValue);
+        } catch (UnsupportedEncodingException e) {
+            String msg = "Unexpected error: " + e.getMessage();
+            throw new IllegalArgumentException(msg, e);
+        }
+    }
+
+    protected void setAccessToken(HttpExchange httpExchange) {
+        httpExchange.setRequestHeader(TOKEN_HEADER, TOKEN_PREFIX + accessToken);
+    }
+
+    private static class DelegatingClientCallback implements ClientResponseCallback {
+        private final ResponseCallback callback;
+
+        public DelegatingClientCallback(ResponseCallback callback) {
+            this.callback = callback;
+        }
+
+        @Override
+        public void onResponse(InputStream response, SalesforceException ex) {
+            callback.onResponse(response, ex);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/RestClient.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/RestClient.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/RestClient.java
new file mode 100644
index 0000000..30186d9
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/RestClient.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.client;
+
+import org.apache.camel.component.salesforce.api.SalesforceException;
+
+import java.io.InputStream;
+
+public interface RestClient {
+
+    public static interface ResponseCallback {
+        void onResponse(InputStream response, SalesforceException exception);
+    }
+
+    /**
+     * Lists summary information about each API version currently available,
+     * including the version, label, and a link to each version's root.
+     *
+     * @param callback {@link ResponseCallback} to handle response or exception
+    */
+    void getVersions(ResponseCallback callback);
+
+    /**
+     * Lists available resources for the specified API version, including resource name and URI.
+     *
+     * @param callback {@link ResponseCallback} to handle response or exception
+    */
+    void getResources(ResponseCallback callback);
+
+    /**
+     * Lists the available objects and their metadata for your organization's data.
+     *
+     * @param callback {@link ResponseCallback} to handle response or exception
+    */
+    void getGlobalObjects(ResponseCallback callback);
+
+    /**
+     * Describes the individual metadata for the specified object.
+     *
+     * @param sObjectName specified object name
+     * @param callback {@link ResponseCallback} to handle response or exception
+    */
+    void getBasicInfo(String sObjectName, ResponseCallback callback);
+
+    /**
+     * Completely describes the individual metadata at all levels for the specified object.
+     *
+     * @param sObjectName specified object name
+     * @param callback {@link ResponseCallback} to handle response or exception
+    */
+    void getDescription(String sObjectName, ResponseCallback callback);
+
+    /**
+     * Retrieves a record for the specified object ID.
+     *
+     * @param sObjectName specified object name
+     * @param id          object id
+     * @param callback {@link ResponseCallback} to handle response or exception
+    */
+    void getSObject(String sObjectName, String id, String[] fields, ResponseCallback callback);
+
+    /**
+     * Creates a record for the specified object.
+     *
+     * @param sObjectName specified object name
+     * @param sObject     request entity
+     * @param callback {@link ResponseCallback} to handle response or exception
+    */
+    void createSObject(String sObjectName, InputStream sObject, ResponseCallback callback);
+
+    /**
+     * Updates a record for the specified object ID.
+     *
+     * @param sObjectName specified object name
+     * @param id          object id
+     * @param sObject     request entity
+     * @param callback {@link ResponseCallback} to handle response or exception
+    */
+    void updateSObject(String sObjectName, String id, InputStream sObject, ResponseCallback callback);
+
+    /**
+     * Deletes a record for the specified object ID.
+     *
+     * @param sObjectName specified object name
+     * @param id          object id
+     * @param callback {@link ResponseCallback} to handle response or exception
+    */
+    void deleteSObject(String sObjectName, String id, ResponseCallback callback);
+
+    /**
+     * Retrieves a record for the specified external ID.
+     *
+     * @param sObjectName specified object name
+     * @param fieldName external field name
+     * @param fieldValue external field value
+     * @param callback {@link ResponseCallback} to handle response or exception
+    */
+    void getSObjectWithId(String sObjectName, String fieldName, String fieldValue, ResponseCallback callback);
+
+    /**
+     * Creates or updates a record based on the value of a specified external ID field.
+     *
+     * @param sObjectName specified object name
+     * @param fieldName external field name
+     * @param fieldValue external field value
+     * @param sObject input object to insert or update
+     * @param callback {@link ResponseCallback} to handle response or exception
+    */
+    void upsertSObject(String sObjectName,
+                              String fieldName, String fieldValue, InputStream sObject, ResponseCallback callback);
+
+    /**
+     * Deletes a record based on the value of a specified external ID field.
+     *
+     * @param sObjectName specified object name
+     * @param fieldName external field name
+     * @param fieldValue external field value
+     * @param callback {@link ResponseCallback} to handle response or exception
+    */
+    void deleteSObjectWithId(String sObjectName,
+                             String fieldName, String fieldValue, ResponseCallback callback);
+
+
+    /**
+     * Retrieves the specified blob field from an individual record.
+     *
+     */
+    void getBlobField(String sObjectName, String id, String blobFieldName, ResponseCallback callback);
+
+/*
+    TODO
+    SObject User Password
+    /vXX.X/sobjects/User/user id/password
+    /vXX.X/sobjects/SelfServiceUser/self service user id/password
+
+    These methods set, reset, or get information about a user password.
+*/
+
+    /**
+     * Executes the specified SOQL query.
+     *
+     * @param soqlQuery SOQL query
+     * @param callback {@link ResponseCallback} to handle response or exception
+    */
+    void query(String soqlQuery, ResponseCallback callback);
+
+    /**
+     * Get SOQL query results using nextRecordsUrl.
+     *
+     * @param nextRecordsUrl URL for next records to fetch, returned by query()
+     * @param callback {@link ResponseCallback} to handle response or exception
+    */
+    void queryMore(String nextRecordsUrl, ResponseCallback callback);
+
+    /**
+     * Executes the specified SOSL search.
+     *
+     * @param soslQuery SOSL query
+     * @param callback {@link ResponseCallback} to handle response or exception
+    */
+    void search(String soslQuery, ResponseCallback callback);
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceExchange.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceExchange.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceExchange.java
new file mode 100644
index 0000000..b17c5e1
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceExchange.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.client;
+
+import org.eclipse.jetty.client.ContentExchange;
+
+/**
+ * Wraps a Salesforce Http Exchange
+ */
+public class SalesforceExchange extends ContentExchange {
+
+    private AbstractClientBase client;
+
+    public AbstractClientBase getClient() {
+        return client;
+    }
+
+    public void setClient(AbstractClientBase client) {
+        this.client = client;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityListener.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityListener.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityListener.java
new file mode 100644
index 0000000..5eec212
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityListener.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.client;
+
+import org.eclipse.jetty.client.HttpDestination;
+import org.eclipse.jetty.client.HttpEventListenerWrapper;
+import org.eclipse.jetty.client.HttpExchange;
+import org.eclipse.jetty.http.HttpHeaders;
+import org.eclipse.jetty.http.HttpStatus;
+import org.eclipse.jetty.io.Buffer;
+import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.component.salesforce.internal.SalesforceSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class SalesforceSecurityListener extends HttpEventListenerWrapper {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SalesforceSecurityListener.class);
+
+    private final HttpDestination destination;
+    private final HttpExchange exchange;
+    private final SalesforceSession session;
+
+    private String currentToken;
+    private int retries;
+    private boolean retrying;
+    private boolean requestComplete;
+    private boolean responseComplete;
+
+    public SalesforceSecurityListener(HttpDestination destination, HttpExchange exchange,
+                                      SalesforceSession session, String accessToken) {
+        super(exchange.getEventListener(), true);
+        this.destination = destination;
+        this.exchange = exchange;
+        this.session = session;
+        this.currentToken = accessToken;
+    }
+
+    @Override
+    public void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException {
+        if (status == HttpStatus.UNAUTHORIZED_401 &&
+            retries < destination.getHttpClient().maxRetries()) {
+
+            LOG.warn("Retrying on Salesforce authentication error [{}]: [{}]", status, reason);
+            setDelegatingRequests(false);
+            setDelegatingResponses(false);
+
+            retrying = true;
+        }
+        super.onResponseStatus(version, status, reason);
+    }
+
+    @Override
+    public void onRequestComplete() throws IOException {
+        requestComplete = true;
+        if (checkExchangeComplete()) {
+            super.onRequestComplete();
+        }
+    }
+
+    @Override
+    public void onResponseComplete() throws IOException {
+        responseComplete = true;
+        if (checkExchangeComplete()) {
+            super.onResponseComplete();
+        }
+    }
+
+    private boolean checkExchangeComplete() throws IOException {
+        if (retrying && requestComplete && responseComplete) {
+            LOG.debug("Authentication Error, retrying: {}", exchange);
+
+            requestComplete = false;
+            responseComplete = false;
+
+            setDelegatingRequests(true);
+            setDelegatingResponses(true);
+
+            try {
+                // get a new token and retry
+                currentToken = session.login(currentToken);
+
+                if (exchange instanceof SalesforceExchange) {
+                    final SalesforceExchange salesforceExchange = (SalesforceExchange) exchange;
+                    final AbstractClientBase client = salesforceExchange.getClient();
+
+                    // update client cache for this and future requests
+                    client.setAccessToken(currentToken);
+                    client.setInstanceUrl(session.getInstanceUrl());
+                    client.setAccessToken(exchange);
+                } else {
+                    exchange.addRequestHeader(HttpHeaders.AUTHORIZATION,
+                        "OAuth " + currentToken);
+                }
+
+                // TODO handle a change in Salesforce instanceUrl, right now we retry with the same destination
+                destination.resend(exchange);
+
+                // resending, exchange is not done
+                return false;
+
+            } catch (SalesforceException e) {
+                // logging here, since login exception is not propagated!
+                LOG.error(e.getMessage(), e);
+
+                // the HTTP status and reason is pushed up
+                setDelegationResult(false);
+            }
+        }
+
+        return true;
+    }
+
+    @Override
+    public void onRetry() {
+        // ignore retries from other interceptors
+        if (retrying) {
+            retrying = false;
+            retries++;
+
+            setDelegatingRequests(true);
+            setDelegatingResponses(true);
+
+            requestComplete = false;
+            responseComplete = false;
+        }
+        super.onRetry();
+    }
+
+    @Override
+    public void onConnectionFailed(Throwable ex) {
+        setDelegatingRequests(true);
+        setDelegatingResponses(true);
+        // delegate connection failures
+        super.onConnectionFailed(ex);
+    }
+
+    @Override
+    public void onException(Throwable ex) {
+        setDelegatingRequests(true);
+        setDelegatingResponses(true);
+        // delegate exceptions
+        super.onException(ex);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SyncResponseCallback.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SyncResponseCallback.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SyncResponseCallback.java
new file mode 100644
index 0000000..0f567e6
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SyncResponseCallback.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.client;
+
+import org.apache.camel.component.salesforce.api.SalesforceException;
+
+import java.io.InputStream;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Thin wrapper to handle callbacks for {@link RestClient.ResponseCallback} and allow waiting for results
+ */
+public class SyncResponseCallback implements RestClient.ResponseCallback {
+
+    private InputStream response;
+    private SalesforceException exception;
+    private CountDownLatch latch = new CountDownLatch(1);
+
+    @Override
+    public void onResponse(InputStream response, SalesforceException exception) {
+        this.response = response;
+        this.exception = exception;
+        latch.countDown();
+    }
+
+    public void reset() {
+        latch = new CountDownLatch(1);
+    }
+
+    public boolean await(long duration, TimeUnit unit) throws InterruptedException {
+        return latch.await(duration, unit);
+    }
+
+    public InputStream getResponse() {
+        return response;
+    }
+
+    public SalesforceException getException() {
+        return exception;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/LoginError.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/LoginError.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/LoginError.java
new file mode 100644
index 0000000..cec3ef8
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/LoginError.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.dto;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * DTO for Salesforce login error
+ */
+public class LoginError {
+
+    private String error;
+
+    private String errorDescription;
+
+    public String getError() {
+        return error;
+    }
+
+    public void setError(String error) {
+        this.error = error;
+    }
+
+    @JsonProperty("error_description")
+    public String getErrorDescription() {
+        return errorDescription;
+    }
+
+    @JsonProperty("error_description")
+    public void setErrorDescription(String errorDescription) {
+        this.errorDescription = errorDescription;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/LoginToken.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/LoginToken.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/LoginToken.java
new file mode 100644
index 0000000..c23338e
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/LoginToken.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.dto;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * DTO for Salesforce login
+ */
+public class LoginToken {
+
+    private String accessToken;
+
+    private String instanceUrl;
+
+    private String id;
+
+    private String signature;
+
+    private String issuedAt;
+
+    @JsonProperty("access_token")
+    public String getAccessToken() {
+        return accessToken;
+    }
+
+    @JsonProperty("access_token")
+    public void setAccessToken(String accessToken) {
+        this.accessToken = accessToken;
+    }
+
+    @JsonProperty("instance_url")
+    public String getInstanceUrl() {
+        return instanceUrl;
+    }
+
+    @JsonProperty("instance_url")
+    public void setInstanceUrl(String instanceUrl) {
+        this.instanceUrl = instanceUrl;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getSignature() {
+        return signature;
+    }
+
+    public void setSignature(String signature) {
+        this.signature = signature;
+    }
+
+    @JsonProperty("issued_at")
+    public String getIssuedAt() {
+        return issuedAt;
+    }
+
+    @JsonProperty("issued_at")
+    public void setIssuedAt(String issuedAt) {
+        this.issuedAt = issuedAt;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/NotifyForFieldsEnum.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/NotifyForFieldsEnum.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/NotifyForFieldsEnum.java
new file mode 100644
index 0000000..970b9aa
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/NotifyForFieldsEnum.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.dto;
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonValue;
+
+/**
+ * Salesforce Enumeration DTO for picklist NotifyForFields
+ */
+public enum NotifyForFieldsEnum {
+
+    SELECT("Select"),
+    WHERE("Where"),
+    REFERENCED("Referenced"),
+    ALL("All");
+
+    final String value;
+
+    private NotifyForFieldsEnum(String value) {
+        this.value = value;
+    }
+
+    @JsonValue
+    public String value() {
+        return this.value;
+    }
+
+    @JsonCreator
+    public static NotifyForFieldsEnum fromValue(String value) {
+        for (NotifyForFieldsEnum e : NotifyForFieldsEnum.values()) {
+            if (e.value.equals(value)) {
+                return e;
+            }
+        }
+        throw new IllegalArgumentException(value);
+    }
+
+}
\ No newline at end of file