You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by dh...@apache.org on 2015/04/06 22:37:56 UTC
[04/23] camel git commit: Added Salesforce component
http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java
new file mode 100644
index 0000000..a16a2a3
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal;
+
+import org.apache.camel.Service;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.eclipse.jetty.client.ContentExchange;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.HttpExchange;
+import org.eclipse.jetty.http.HttpMethods;
+import org.eclipse.jetty.http.HttpStatus;
+import org.eclipse.jetty.io.Buffer;
+import org.eclipse.jetty.io.ByteArrayBuffer;
+import org.eclipse.jetty.util.StringUtil;
+import org.eclipse.jetty.util.UrlEncoded;
+import org.apache.camel.component.salesforce.SalesforceLoginConfig;
+import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.component.salesforce.api.dto.RestError;
+import org.apache.camel.component.salesforce.internal.dto.LoginError;
+import org.apache.camel.component.salesforce.internal.dto.LoginToken;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+public class SalesforceSession implements Service {
+
+ private static final String OAUTH2_REVOKE_PATH = "/services/oauth2/revoke?token=";
+ private static final String OAUTH2_TOKEN_PATH = "/services/oauth2/token";
+
+ private static final Logger LOG = LoggerFactory.getLogger(SalesforceSession.class);
+ private static final String FORM_CONTENT_TYPE = "application/x-www-form-urlencoded;charset=utf-8";
+
+ private final HttpClient httpClient;
+
+ private final SalesforceLoginConfig config;
+
+ private final ObjectMapper objectMapper;
+ private final Set<SalesforceSessionListener> listeners;
+
+ private String accessToken;
+ private String instanceUrl;
+
+ public SalesforceSession(HttpClient httpClient, SalesforceLoginConfig config) {
+ // validate parameters
+ assertNotNull("Null httpClient", httpClient);
+ assertNotNull("Null SalesforceLoginConfig", config);
+ assertNotNull("Null loginUrl", config.getLoginUrl());
+ assertNotNull("Null clientId", config.getClientId());
+ assertNotNull("Null clientSecret", config.getClientSecret());
+ assertNotNull("Null userName", config.getUserName());
+ assertNotNull("Null password", config.getPassword());
+
+ this.httpClient = httpClient;
+ this.config = config;
+
+ // strip trailing '/'
+ String loginUrl = config.getLoginUrl();
+ config.setLoginUrl(loginUrl.endsWith("/") ? loginUrl.substring(0, loginUrl.length() - 1) : loginUrl);
+
+ this.objectMapper = new ObjectMapper();
+ this.listeners = new CopyOnWriteArraySet<SalesforceSessionListener>();
+ }
+
+ private void assertNotNull(String s, Object o) {
+ if (o == null) {
+ throw new IllegalArgumentException(s);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public synchronized String login(String oldToken) throws SalesforceException {
+
+ // check if we need a new session
+ // this way there's always a single valid session
+ if ((accessToken == null) || accessToken.equals(oldToken)) {
+
+ // try revoking the old access token before creating a new one
+ accessToken = oldToken;
+ if (accessToken != null) {
+ try {
+ logout();
+ } catch (SalesforceException e) {
+ LOG.warn("Error revoking old access token: " + e.getMessage(), e);
+ }
+ accessToken = null;
+ }
+
+ // login to Salesforce and get session id
+ final StatusExceptionExchange loginPost = new StatusExceptionExchange(true);
+ loginPost.setURL(config.getLoginUrl() + OAUTH2_TOKEN_PATH);
+ loginPost.setMethod(HttpMethods.POST);
+ loginPost.setRequestContentType(FORM_CONTENT_TYPE);
+
+ final UrlEncoded nvps = new UrlEncoded();
+ nvps.put("grant_type", "password");
+ nvps.put("client_id", config.getClientId());
+ nvps.put("client_secret", config.getClientSecret());
+ nvps.put("username", config.getUserName());
+ nvps.put("password", config.getPassword());
+ nvps.put("format", "json");
+
+ try {
+
+ // set form content
+ loginPost.setRequestContent(new ByteArrayBuffer(
+ nvps.encode(StringUtil.__UTF8, true).getBytes(StringUtil.__UTF8)));
+ httpClient.send(loginPost);
+
+ // wait for the login to finish
+ final int exchangeState = loginPost.waitForDone();
+
+ switch (exchangeState) {
+ case HttpExchange.STATUS_COMPLETED:
+ final byte[] responseContent = loginPost.getResponseContentBytes();
+ final int responseStatus = loginPost.getResponseStatus();
+ switch (responseStatus) {
+
+ case HttpStatus.OK_200:
+ // parse the response to get token
+ LoginToken token = objectMapper.readValue(responseContent,
+ LoginToken.class);
+
+ // don't log token or instance URL for security reasons
+ LOG.info("Login successful");
+ accessToken = token.getAccessToken();
+ instanceUrl = token.getInstanceUrl();
+
+ // notify all listeners
+ for (SalesforceSessionListener listener : listeners) {
+ try {
+ listener.onLogin(accessToken, instanceUrl);
+ } catch (Throwable t) {
+ LOG.warn("Unexpected error from listener {}: {}", listener, t.getMessage());
+ }
+ }
+
+ break;
+
+ case HttpStatus.BAD_REQUEST_400:
+ // parse the response to get error
+ final LoginError error = objectMapper.readValue(responseContent,
+ LoginError.class);
+ final String msg = String.format("Login error code:[%s] description:[%s]",
+ error.getError(), error.getErrorDescription());
+ final List<RestError> errors = new ArrayList<RestError>();
+ errors.add(new RestError(msg, error.getErrorDescription()));
+ throw new SalesforceException(errors, HttpStatus.BAD_REQUEST_400);
+
+ default:
+ throw new SalesforceException(
+ String.format("Login error status:[%s] reason:[%s]",
+ responseStatus, loginPost.getReason()),
+ responseStatus);
+ }
+ break;
+
+ case HttpExchange.STATUS_EXCEPTED:
+ final Throwable ex = loginPost.getException();
+ throw new SalesforceException(
+ String.format("Unexpected login exception: %s", ex.getMessage()),
+ ex);
+
+ case HttpExchange.STATUS_CANCELLED:
+ throw new SalesforceException("Login request CANCELLED!", null);
+
+ case HttpExchange.STATUS_EXPIRED:
+ throw new SalesforceException("Login request TIMEOUT!", null);
+
+ }
+
+ } catch (IOException e) {
+ String msg = "Login error: unexpected exception " + e.getMessage();
+ throw new SalesforceException(msg, e);
+ } catch (InterruptedException e) {
+ String msg = "Login error: unexpected exception " + e.getMessage();
+ throw new SalesforceException(msg, e);
+ }
+ }
+
+ return accessToken;
+ }
+
+ public void logout() throws SalesforceException {
+ if (accessToken == null) {
+ return;
+ }
+
+ StatusExceptionExchange logoutGet = new StatusExceptionExchange(true);
+ logoutGet.setURL(config.getLoginUrl() + OAUTH2_REVOKE_PATH + accessToken);
+ logoutGet.setMethod(HttpMethods.GET);
+
+ try {
+ httpClient.send(logoutGet);
+ final int done = logoutGet.waitForDone();
+ switch (done) {
+
+ case HttpExchange.STATUS_COMPLETED:
+ final int statusCode = logoutGet.getResponseStatus();
+ final String reason = logoutGet.getReason();
+
+ if (statusCode == HttpStatus.OK_200) {
+ LOG.info("Logout successful");
+ } else {
+ throw new SalesforceException(
+ String.format("Logout error, code: [%s] reason: [%s]",
+ statusCode, reason),
+ statusCode);
+ }
+ break;
+
+ case HttpExchange.STATUS_EXCEPTED:
+ final Throwable ex = logoutGet.getException();
+ throw new SalesforceException("Unexpected logout exception: " + ex.getMessage(), ex);
+
+ case HttpExchange.STATUS_CANCELLED:
+ throw new SalesforceException("Logout request CANCELLED!", null);
+
+ case HttpExchange.STATUS_EXPIRED:
+ throw new SalesforceException("Logout request TIMEOUT!", null);
+
+ }
+
+ } catch (SalesforceException e) {
+ throw e;
+ } catch (Exception e) {
+ String msg = "Logout error: " + e.getMessage();
+ throw new SalesforceException(msg, e);
+ } finally {
+ // reset session
+ accessToken = null;
+ instanceUrl = null;
+ // notify all session listeners of the new access token and instance url
+ for (SalesforceSessionListener listener : listeners) {
+ try {
+ listener.onLogout();
+ } catch (Throwable t) {
+ LOG.warn("Unexpected error from listener {}: {}", listener, t.getMessage());
+ }
+ }
+ }
+ }
+
+ public String getAccessToken() {
+ return accessToken;
+ }
+
+ public String getInstanceUrl() {
+ return instanceUrl;
+ }
+
+ public boolean addListener(SalesforceSessionListener listener) {
+ return listeners.add(listener);
+ }
+
+ public boolean removeListener(SalesforceSessionListener listener) {
+ return listeners.remove(listener);
+ }
+
+ @Override
+ public void start() throws Exception {
+ // auto-login at start if needed
+ login(accessToken);
+ }
+
+ @Override
+ public void stop() throws Exception {
+ // logout
+ logout();
+ }
+
+ /**
+ * Records status line, and exception from exchange.
+ *
+ * @author dbokde
+ */
+ private static class StatusExceptionExchange extends ContentExchange {
+
+ private String reason;
+ private Throwable exception;
+
+ public StatusExceptionExchange(boolean cacheFields) {
+ super(cacheFields);
+ }
+
+ @Override
+ protected synchronized void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException {
+ // remember reason
+ this.reason = reason.toString(StringUtil.__ISO_8859_1);
+ super.onResponseStatus(version, status, reason);
+ }
+
+ @Override
+ protected void onConnectionFailed(Throwable x) {
+ this.exception = x;
+ super.onConnectionFailed(x);
+ }
+
+ @Override
+ protected void onException(Throwable x) {
+ this.exception = x;
+ super.onException(x);
+ }
+
+ public String getReason() {
+ return reason;
+ }
+
+ public Throwable getException() {
+ return exception;
+ }
+
+ }
+
+ public static interface SalesforceSessionListener {
+ void onLogin(String accessToken, String instanceUrl);
+ void onLogout();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
new file mode 100644
index 0000000..6fe7028
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.client;
+
+import org.apache.camel.Service;
+import org.eclipse.jetty.client.ContentExchange;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.HttpEventListenerWrapper;
+import org.eclipse.jetty.client.HttpExchange;
+import org.eclipse.jetty.http.HttpSchemes;
+import org.eclipse.jetty.http.HttpStatus;
+import org.eclipse.jetty.io.Buffer;
+import org.eclipse.jetty.util.StringUtil;
+import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.component.salesforce.internal.SalesforceSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+public abstract class AbstractClientBase implements SalesforceSession.SalesforceSessionListener, Service {
+
+ protected final Logger LOG = LoggerFactory.getLogger(getClass());
+
+ protected static final String APPLICATION_JSON_UTF8 = "application/json;charset=utf-8";
+ protected static final String APPLICATION_XML_UTF8 = "application/xml;charset=utf-8";
+
+ protected final HttpClient httpClient;
+ protected final SalesforceSession session;
+ protected final String version;
+
+ protected String accessToken;
+ protected String instanceUrl;
+
+ public AbstractClientBase(String version,
+ SalesforceSession session, HttpClient httpClient) throws SalesforceException {
+
+ this.version = version;
+ this.session = session;
+ this.httpClient = httpClient;
+ }
+
+ public void start() throws Exception {
+ // local cache
+ accessToken = session.getAccessToken();
+ if (accessToken == null) {
+ // lazy login here!
+ accessToken = session.login(accessToken);
+ }
+ instanceUrl = session.getInstanceUrl();
+
+ // also register this client as a session listener
+ session.addListener(this);
+ }
+
+ @Override
+ public void stop() throws Exception {
+ // deregister listener
+ session.removeListener(this);
+ }
+
+ @Override
+ public void onLogin(String accessToken, String instanceUrl) {
+ if (!accessToken.equals(this.accessToken)) {
+ this.accessToken = accessToken;
+ this.instanceUrl = instanceUrl;
+ }
+ }
+
+ @Override
+ public void onLogout() {
+ // ignore, if this client makes another request with stale token,
+ // SalesforceSecurityListener will auto login!
+ }
+
+ protected SalesforceExchange getContentExchange(String method, String url) {
+ SalesforceExchange get = new SalesforceExchange();
+ get.setMethod(method);
+ get.setURL(url);
+ get.setClient(this);
+ return get;
+ }
+
+ protected interface ClientResponseCallback {
+ void onResponse(InputStream response, SalesforceException ex);
+ }
+
+ protected void doHttpRequest(final ContentExchange request, final ClientResponseCallback callback) {
+
+ // use SalesforceSecurityListener for security login retries
+ try {
+ final boolean isHttps = HttpSchemes.HTTPS.equals(String.valueOf(request.getScheme()));
+ request.setEventListener(new SalesforceSecurityListener(
+ httpClient.getDestination(request.getAddress(), isHttps),
+ request, session, accessToken));
+ } catch (IOException e) {
+ // propagate exception
+ callback.onResponse(null, new SalesforceException(
+ String.format("Error registering security listener: %s", e.getMessage()),
+ e));
+ }
+
+ // use HttpEventListener for lifecycle events
+ request.setEventListener(new HttpEventListenerWrapper(request.getEventListener(), true) {
+
+ public String reason;
+
+ @Override
+ public void onConnectionFailed(Throwable ex) {
+ super.onConnectionFailed(ex);
+ callback.onResponse(null,
+ new SalesforceException("Connection error: " + ex.getMessage(), ex));
+ }
+
+ @Override
+ public void onException(Throwable ex) {
+ super.onException(ex);
+ callback.onResponse(null,
+ new SalesforceException("Unexpected exception: " + ex.getMessage(), ex));
+ }
+
+ @Override
+ public void onExpire() {
+ super.onExpire();
+ callback.onResponse(null,
+ new SalesforceException("Request expired", null));
+ }
+
+ @Override
+ public void onResponseComplete() throws IOException {
+ super.onResponseComplete();
+
+ final int responseStatus = request.getResponseStatus();
+ if (responseStatus < HttpStatus.OK_200 || responseStatus >= HttpStatus.MULTIPLE_CHOICES_300) {
+ final String msg = String.format("Error {%s:%s} executing {%s:%s}",
+ responseStatus, reason, request.getMethod(), request.getRequestURI());
+ final SalesforceException exception = new SalesforceException(msg, createRestException(request));
+ exception.setStatusCode(responseStatus);
+ callback.onResponse(null, exception);
+ } else {
+ // TODO not memory efficient for large response messages,
+ // doesn't seem to be possible in Jetty 7 to directly stream to response parsers
+ final byte[] bytes = request.getResponseContentBytes();
+ callback.onResponse(bytes != null ? new ByteArrayInputStream(bytes) : null, null);
+ }
+
+ }
+
+ @Override
+ public void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException {
+ super.onResponseStatus(version, status, reason);
+ // remember status reason
+ this.reason = reason.toString(StringUtil.__ISO_8859_1);
+ }
+ });
+
+ // execute the request
+ try {
+ httpClient.send(request);
+ } catch (IOException e) {
+ String msg = "Unexpected Error: " + e.getMessage();
+ // send error through callback
+ callback.onResponse(null, new SalesforceException(msg, e));
+ }
+
+ }
+
+ public void setAccessToken(String accessToken) {
+ this.accessToken = accessToken;
+ }
+
+ public void setInstanceUrl(String instanceUrl) {
+ this.instanceUrl = instanceUrl;
+ }
+
+ protected abstract void setAccessToken(HttpExchange httpExchange);
+
+ protected abstract SalesforceException createRestException(ContentExchange httpExchange);
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/BulkApiClient.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/BulkApiClient.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/BulkApiClient.java
new file mode 100644
index 0000000..b00e3fd
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/BulkApiClient.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.client;
+
+import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.component.salesforce.api.dto.bulk.*;
+
+import java.io.InputStream;
+import java.util.List;
+
+/**
+ * Client interface for Salesforce Bulk API
+ */
+public interface BulkApiClient {
+
+ public static interface JobInfoResponseCallback {
+ void onResponse(JobInfo jobInfo, SalesforceException ex);
+ }
+
+ public static interface BatchInfoResponseCallback {
+ void onResponse(BatchInfo batchInfo, SalesforceException ex);
+ }
+
+ public static interface BatchInfoListResponseCallback {
+ void onResponse(List<BatchInfo> batchInfoList, SalesforceException ex);
+ }
+
+ public static interface StreamResponseCallback {
+ void onResponse(InputStream inputStream, SalesforceException ex);
+ }
+
+ public static interface QueryResultIdsCallback {
+ void onResponse(List<String> ids, SalesforceException ex);
+ }
+
+ /**
+ * Creates a Bulk Job
+ *
+ * @param jobInfo {@link JobInfo} with required fields
+ * @param callback {@link JobInfoResponseCallback} to be invoked on response or error
+ */
+ void createJob(JobInfo jobInfo,
+ JobInfoResponseCallback callback);
+
+ void getJob(String jobId,
+ JobInfoResponseCallback callback);
+
+ void closeJob(String jobId,
+ JobInfoResponseCallback callback);
+
+ void abortJob(String jobId,
+ JobInfoResponseCallback callback);
+
+ void createBatch(InputStream batchStream, String jobId, ContentType contentTypeEnum,
+ BatchInfoResponseCallback callback);
+
+ void getBatch(String jobId, String batchId,
+ BatchInfoResponseCallback callback);
+
+ void getAllBatches(String jobId,
+ BatchInfoListResponseCallback callback);
+
+ void getRequest(String jobId, String batchId,
+ StreamResponseCallback callback);
+
+ void getResults(String jobId, String batchId,
+ StreamResponseCallback callback);
+
+ void createBatchQuery(String jobId, String soqlQuery, ContentType jobContentType,
+ BatchInfoResponseCallback callback);
+
+ void getQueryResultIds(String jobId, String batchId,
+ QueryResultIdsCallback callback);
+
+ void getQueryResult(String jobId, String batchId, String resultId,
+ StreamResponseCallback callback);
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java
new file mode 100644
index 0000000..b4899e5
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java
@@ -0,0 +1,481 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.client;
+
+import org.eclipse.jetty.client.ContentExchange;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.HttpExchange;
+import org.eclipse.jetty.http.HttpHeaders;
+import org.eclipse.jetty.http.HttpMethods;
+import org.eclipse.jetty.io.ByteArrayBuffer;
+import org.eclipse.jetty.util.StringUtil;
+import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.component.salesforce.api.dto.RestError;
+import org.apache.camel.component.salesforce.api.dto.bulk.*;
+import org.apache.camel.component.salesforce.api.dto.bulk.Error;
+import org.apache.camel.component.salesforce.internal.SalesforceSession;
+
+import javax.xml.bind.*;
+import javax.xml.transform.stream.StreamSource;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+
+public class DefaultBulkApiClient extends AbstractClientBase implements BulkApiClient {
+
+ private static final String TOKEN_HEADER = "X-SFDC-Session";
+
+ private JAXBContext context;
+ private static final ContentType DEFAULT_ACCEPT_TYPE = ContentType.XML;
+ private ObjectFactory objectFactory;
+
+ public DefaultBulkApiClient(String version,
+ SalesforceSession session, HttpClient httpClient) throws SalesforceException {
+ super(version, session, httpClient);
+
+ try {
+ context = JAXBContext.newInstance(JobInfo.class.getPackage().getName(), getClass().getClassLoader());
+ } catch (JAXBException e) {
+ String msg = "Error loading Bulk API DTOs: " + e.getMessage();
+ throw new IllegalArgumentException(msg, e);
+ }
+
+ this.objectFactory = new ObjectFactory();
+ }
+
+ @Override
+ public void createJob(JobInfo request, final JobInfoResponseCallback callback) {
+
+ // clear system fields if set
+ sanitizeJobRequest(request);
+
+ final ContentExchange post = getContentExchange(HttpMethods.POST, jobUrl(null));
+ try {
+ marshalRequest(objectFactory.createJobInfo(request), post, APPLICATION_XML_UTF8);
+ } catch (SalesforceException e) {
+ callback.onResponse(null, e);
+ return;
+ }
+
+ // make the call and parse the result in callback
+ doHttpRequest(post, new ClientResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException ex) {
+ JobInfo value = null;
+ if (response != null) {
+ try {
+ value = unmarshalResponse(response, post, JobInfo.class);
+ } catch (SalesforceException e) {
+ ex = e;
+ }
+ }
+ callback.onResponse(value, ex);
+ }
+ });
+
+ }
+
+ // reset read only fields
+ private void sanitizeJobRequest(JobInfo request) {
+ request.setApexProcessingTime(null);
+ request.setApiActiveProcessingTime(null);
+ request.setApiVersion(null);
+ request.setCreatedById(null);
+ request.setCreatedDate(null);
+ request.setId(null);
+ request.setNumberBatchesCompleted(null);
+ request.setNumberBatchesFailed(null);
+ request.setNumberBatchesInProgress(null);
+ request.setNumberBatchesQueued(null);
+ request.setNumberBatchesTotal(null);
+ request.setNumberRecordsFailed(null);
+ request.setNumberRecordsProcessed(null);
+ request.setNumberRetries(null);
+ request.setState(null);
+ request.setSystemModstamp(null);
+ request.setSystemModstamp(null);
+ }
+
+ @Override
+ public void getJob(String jobId, final JobInfoResponseCallback callback) {
+
+ final ContentExchange get = getContentExchange(HttpMethods.GET, jobUrl(jobId));
+
+ // make the call and parse the result
+ doHttpRequest(get, new ClientResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException ex) {
+ JobInfo value = null;
+ try {
+ value = unmarshalResponse(response, get, JobInfo.class);
+ } catch (SalesforceException e) {
+ ex = e;
+ }
+ callback.onResponse(value, ex);
+ }
+ });
+
+ }
+
+ @Override
+ public void closeJob(String jobId, final JobInfoResponseCallback callback) {
+ final JobInfo request = new JobInfo();
+ request.setState(JobStateEnum.CLOSED);
+
+ final ContentExchange post = getContentExchange(HttpMethods.POST, jobUrl(jobId));
+ try {
+ marshalRequest(objectFactory.createJobInfo(request), post, APPLICATION_XML_UTF8);
+ } catch (SalesforceException e) {
+ callback.onResponse(null, e);
+ return;
+ }
+
+ // make the call and parse the result
+ doHttpRequest(post, new ClientResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException ex) {
+ JobInfo value = null;
+ try {
+ value = unmarshalResponse(response, post, JobInfo.class);
+ } catch (SalesforceException e) {
+ ex = e;
+ }
+ callback.onResponse(value, ex);
+ }
+ });
+
+ }
+
+ @Override
+ public void abortJob(String jobId, final JobInfoResponseCallback callback) {
+ final JobInfo request = new JobInfo();
+ request.setState(JobStateEnum.ABORTED);
+
+ final ContentExchange post = getContentExchange(HttpMethods.POST, jobUrl(jobId));
+ try {
+ marshalRequest(objectFactory.createJobInfo(request), post, APPLICATION_XML_UTF8);
+ } catch (SalesforceException e) {
+ callback.onResponse(null, e);
+ return;
+ }
+
+ // make the call and parse the result
+ doHttpRequest(post, new ClientResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException ex) {
+ JobInfo value = null;
+ try {
+ value = unmarshalResponse(response, post, JobInfo.class);
+ } catch (SalesforceException e) {
+ ex = e;
+ }
+ callback.onResponse(value, ex);
+ }
+ });
+
+ }
+
+ @Override
+ public void createBatch(InputStream batchStream, String jobId, ContentType contentTypeEnum,
+ final BatchInfoResponseCallback callback) {
+
+ final ContentExchange post = getContentExchange(HttpMethods.POST, batchUrl(jobId, null));
+ post.setRequestContentSource(batchStream);
+ post.setRequestContentType(getContentType(contentTypeEnum) + ";charset=" + StringUtil.__UTF8);
+
+ // make the call and parse the result
+ doHttpRequest(post, new ClientResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException ex) {
+ BatchInfo value = null;
+ try {
+ value = unmarshalResponse(response, post, BatchInfo.class);
+ } catch (SalesforceException e) {
+ ex = e;
+ }
+ callback.onResponse(value, ex);
+ }
+ });
+
+ }
+
+ @Override
+ public void getBatch(String jobId, String batchId,
+ final BatchInfoResponseCallback callback) {
+
+ final ContentExchange get = getContentExchange(HttpMethods.GET, batchUrl(jobId, batchId));
+
+ // make the call and parse the result
+ doHttpRequest(get, new ClientResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException ex) {
+ BatchInfo value = null;
+ try {
+ value = unmarshalResponse(response, get, BatchInfo.class);
+ } catch (SalesforceException e) {
+ ex = e;
+ }
+ callback.onResponse(value, ex);
+ }
+ });
+
+ }
+
+ @Override
+ public void getAllBatches(String jobId,
+ final BatchInfoListResponseCallback callback) {
+
+ final ContentExchange get = getContentExchange(HttpMethods.GET, batchUrl(jobId, null));
+
+ // make the call and parse the result
+ doHttpRequest(get, new ClientResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException ex) {
+ BatchInfoList value = null;
+ try {
+ value = unmarshalResponse(response, get, BatchInfoList.class);
+ } catch (SalesforceException e) {
+ ex = e;
+ }
+ callback.onResponse(value != null ? value.getBatchInfo() : null, ex);
+ }
+ });
+
+ }
+
+ @Override
+ public void getRequest(String jobId, String batchId,
+ final StreamResponseCallback callback) {
+
+ final ContentExchange get = getContentExchange(HttpMethods.GET, batchUrl(jobId, batchId));
+
+ // make the call and parse the result
+ doHttpRequest(get, new ClientResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException ex) {
+ callback.onResponse(response, ex);
+ }
+ });
+
+ }
+
+ @Override
+ public void getResults(String jobId, String batchId,
+ final StreamResponseCallback callback) {
+ final ContentExchange get = getContentExchange(HttpMethods.GET, batchResultUrl(jobId, batchId, null));
+
+ // make the call and return the result
+ doHttpRequest(get, new ClientResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException ex) {
+ callback.onResponse(response, ex);
+ }
+ });
+ }
+
+ @Override
+ public void createBatchQuery(String jobId, String soqlQuery, ContentType jobContentType,
+ final BatchInfoResponseCallback callback) {
+
+ final ContentExchange post = getContentExchange(HttpMethods.POST, batchUrl(jobId, null));
+ byte[] queryBytes = soqlQuery.getBytes(StringUtil.__UTF8_CHARSET);
+ post.setRequestContent(new ByteArrayBuffer(queryBytes));
+ post.setRequestContentType(getContentType(jobContentType) + ";charset=" + StringUtil.__UTF8);
+
+ // make the call and parse the result
+ doHttpRequest(post, new ClientResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException ex) {
+ BatchInfo value = null;
+ try {
+ value = unmarshalResponse(response, post, BatchInfo.class);
+ } catch (SalesforceException e) {
+ ex = e;
+ }
+ callback.onResponse(value, ex);
+ }
+ });
+
+ }
+
+ @Override
+ public void getQueryResultIds(String jobId, String batchId,
+ final QueryResultIdsCallback callback) {
+ final ContentExchange get = getContentExchange(HttpMethods.GET, batchResultUrl(jobId, batchId, null));
+
+ // make the call and parse the result
+ doHttpRequest(get, new ClientResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException ex) {
+ QueryResultList value = null;
+ try {
+ value = unmarshalResponse(response, get, QueryResultList.class);
+ } catch (SalesforceException e) {
+ ex = e;
+ }
+ callback.onResponse(value != null ? Collections.unmodifiableList(value.getResult()) : null,
+ ex);
+ }
+ });
+
+ }
+
+ @Override
+ public void getQueryResult(String jobId, String batchId, String resultId,
+ final StreamResponseCallback callback) {
+ final ContentExchange get = getContentExchange(HttpMethods.GET, batchResultUrl(jobId, batchId, resultId));
+
+ // make the call and parse the result
+ doHttpRequest(get, new ClientResponseCallback() {
+ @Override
+ public void onResponse(InputStream response, SalesforceException ex) {
+ callback.onResponse(response, ex);
+ }
+ });
+
+ }
+
+ @Override
+ protected void setAccessToken(HttpExchange httpExchange) {
+ httpExchange.setRequestHeader(TOKEN_HEADER, accessToken);
+ }
+
+ @Override
+ protected void doHttpRequest(ContentExchange request, ClientResponseCallback callback) {
+ // set access token for all requests
+ setAccessToken(request);
+
+ // set default charset
+ request.setRequestHeader(HttpHeaders.ACCEPT_CHARSET, StringUtil.__UTF8);
+
+ // TODO check if this is really needed or not, since SF response content type seems fixed
+ // check if the default accept content type must be used
+ if (!request.getRequestFields().containsKey(HttpHeaders.ACCEPT)) {
+ final String contentType = getContentType(DEFAULT_ACCEPT_TYPE);
+ request.setRequestHeader(HttpHeaders.ACCEPT, contentType);
+ // request content type and charset is set by the request entity
+ }
+
+ super.doHttpRequest(request, callback);
+ }
+
+ private static String getContentType(ContentType type) {
+ String result = null;
+
+ switch (type) {
+ case CSV:
+ result = "text/csv";
+ break;
+
+ case XML:
+ result = "application/xml";
+ break;
+
+ case ZIP_CSV:
+ case ZIP_XML:
+ result = type.toString().toLowerCase().replace('_', '/');
+ break;
+ }
+
+ return result;
+ }
+
+ @Override
+ protected SalesforceException createRestException(ContentExchange request) {
+ // this must be of type Error
+ try {
+ final Error error = unmarshalResponse(new ByteArrayInputStream(request.getResponseContentBytes()),
+ request, Error.class);
+
+ final RestError restError = new RestError();
+ restError.setErrorCode(error.getExceptionCode());
+ restError.setMessage(error.getExceptionMessage());
+
+ return new SalesforceException(Arrays.asList(restError), request.getResponseStatus());
+ } catch (SalesforceException e) {
+ String msg = "Error un-marshaling Salesforce Error: " + e.getMessage();
+ return new SalesforceException(msg, e);
+ }
+ }
+
+ private <T> T unmarshalResponse(InputStream response, ContentExchange request, Class<T> resultClass)
+ throws SalesforceException {
+ try {
+ Unmarshaller unmarshaller = context.createUnmarshaller();
+ JAXBElement<T> result = unmarshaller.unmarshal(new StreamSource(response), resultClass);
+ return result.getValue();
+ } catch (JAXBException e) {
+ throw new SalesforceException(
+ String.format("Error unmarshaling response {%s:%s} : %s",
+ request.getMethod(), request.getRequestURI(), e.getMessage()),
+ e);
+ } catch (IllegalArgumentException e) {
+ throw new SalesforceException(
+ String.format("Error unmarshaling response for {%s:%s} : %s",
+ request.getMethod(), request.getRequestURI(), e.getMessage()),
+ e);
+ }
+ }
+
+ private void marshalRequest(Object input, ContentExchange request, String contentType)
+ throws SalesforceException {
+ try {
+ Marshaller marshaller = context.createMarshaller();
+ ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+ marshaller.marshal(input, byteStream);
+ request.setRequestContent(new ByteArrayBuffer(byteStream.toByteArray()));
+ request.setRequestContentType(contentType);
+ } catch (JAXBException e) {
+ throw new SalesforceException(
+ String.format("Error marshaling request for {%s:%s} : %s",
+ request.getMethod(), request.getRequestURI(), e.getMessage()),
+ e);
+ } catch (IllegalArgumentException e) {
+ throw new SalesforceException(
+ String.format("Error marshaling request for {%s:%s} : %s",
+ request.getMethod(), request.getRequestURI(), e.getMessage()),
+ e);
+ }
+ }
+
+ private String jobUrl(String jobId) {
+ if (jobId != null) {
+ return super.instanceUrl + "/services/async/" + version + "/job/" + jobId;
+ } else {
+ return super.instanceUrl + "/services/async/" + version + "/job";
+ }
+ }
+
+ private String batchUrl(String jobId, String batchId) {
+ if (batchId != null) {
+ return jobUrl(jobId) + "/batch/" + batchId;
+ } else {
+ return jobUrl(jobId) + "/batch";
+ }
+ }
+
+ private String batchResultUrl(String jobId, String batchId, String resultId) {
+ if (resultId != null) {
+ return batchUrl(jobId, batchId) + "/result/" + resultId;
+ } else {
+ return batchUrl(jobId, batchId) + "/result";
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultRestClient.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultRestClient.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultRestClient.java
new file mode 100644
index 0000000..07f5af2
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultRestClient.java
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.client;
+
+import com.thoughtworks.xstream.XStream;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+import org.eclipse.jetty.client.ContentExchange;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.HttpExchange;
+import org.eclipse.jetty.http.HttpHeaders;
+import org.eclipse.jetty.http.HttpMethods;
+import org.eclipse.jetty.util.StringUtil;
+import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.component.salesforce.api.dto.RestError;
+import org.apache.camel.component.salesforce.internal.SalesforceSession;
+import org.apache.camel.component.salesforce.internal.dto.RestErrors;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.List;
+
+public class DefaultRestClient extends AbstractClientBase implements RestClient {
+
+ private static final String SERVICES_DATA = "/services/data/";
+ private static final String TOKEN_HEADER = "Authorization";
+ private static final String TOKEN_PREFIX = "Bearer ";
+
+ private ObjectMapper objectMapper;
+ private XStream xStream;
+ protected String format;
+
+ public DefaultRestClient(HttpClient httpClient,
+ String version, String format, SalesforceSession session) throws SalesforceException {
+ super(version, session, httpClient);
+
+ this.format = format;
+
+ // initialize error parsers for JSON and XML
+ this.objectMapper = new ObjectMapper();
+ this.xStream = new XStream();
+ xStream.processAnnotations(RestErrors.class);
+ }
+
+ @Override
+ protected void doHttpRequest(ContentExchange request, ClientResponseCallback callback) {
+ // set standard headers for all requests
+ final String contentType = "json".equals(format) ? APPLICATION_JSON_UTF8 : APPLICATION_XML_UTF8;
+ request.setRequestHeader(HttpHeaders.ACCEPT, contentType);
+ request.setRequestHeader(HttpHeaders.ACCEPT_CHARSET, StringUtil.__UTF8);
+ // request content type and charset is set by the request entity
+
+ super.doHttpRequest(request, callback);
+ }
+
+ @Override
+ protected SalesforceException createRestException(ContentExchange httpExchange) {
+ // try parsing response according to format
+ try {
+ if ("json".equals(format)) {
+ List<RestError> restErrors = objectMapper.readValue(
+ httpExchange.getResponseContent(), new TypeReference<List<RestError>>() {
+ });
+ return new SalesforceException(restErrors, httpExchange.getResponseStatus());
+ } else {
+ RestErrors errors = new RestErrors();
+ xStream.fromXML(httpExchange.getResponseContent(), errors);
+ return new SalesforceException(errors.getErrors(), httpExchange.getResponseStatus());
+ }
+ } catch (IOException e) {
+ // log and ignore
+ String msg = "Unexpected Error parsing " + format + " error response: " + e.getMessage();
+ LOG.warn(msg, e);
+ } catch (RuntimeException e) {
+ // log and ignore
+ String msg = "Unexpected Error parsing " + format + " error response: " + e.getMessage();
+ LOG.warn(msg, e);
+ }
+
+ // just report HTTP status info
+ return new SalesforceException("Unexpected error", httpExchange.getResponseStatus());
+ }
+
+ @Override
+ public void getVersions(final ResponseCallback callback) {
+ ContentExchange get = getContentExchange(HttpMethods.GET, servicesDataUrl());
+ // does not require authorization token
+
+ doHttpRequest(get, new DelegatingClientCallback(callback));
+ }
+
+ @Override
+ public void getResources(ResponseCallback callback) {
+ ContentExchange get = getContentExchange(HttpMethods.GET, versionUrl());
+ // requires authorization token
+ setAccessToken(get);
+
+ doHttpRequest(get, new DelegatingClientCallback(callback));
+ }
+
+ @Override
+ public void getGlobalObjects(ResponseCallback callback) {
+ ContentExchange get = getContentExchange(HttpMethods.GET, sobjectsUrl(""));
+ // requires authorization token
+ setAccessToken(get);
+
+ doHttpRequest(get, new DelegatingClientCallback(callback));
+ }
+
+ @Override
+ public void getBasicInfo(String sObjectName,
+ ResponseCallback callback) {
+ ContentExchange get = getContentExchange(HttpMethods.GET, sobjectsUrl(sObjectName + "/"));
+ // requires authorization token
+ setAccessToken(get);
+
+ doHttpRequest(get, new DelegatingClientCallback(callback));
+ }
+
+ @Override
+ public void getDescription(String sObjectName,
+ ResponseCallback callback) {
+ ContentExchange get = getContentExchange(HttpMethods.GET, sobjectsUrl(sObjectName + "/describe/"));
+ // requires authorization token
+ setAccessToken(get);
+
+ doHttpRequest(get, new DelegatingClientCallback(callback));
+ }
+
+ @Override
+ public void getSObject(String sObjectName, String id, String[] fields,
+ ResponseCallback callback) {
+
+ // parse fields if set
+ String params = "";
+ if (fields != null && fields.length > 0) {
+ StringBuilder fieldsValue = new StringBuilder("?fields=");
+ for (int i = 0; i < fields.length; i++) {
+ fieldsValue.append(fields[i]);
+ if (i < (fields.length - 1)) {
+ fieldsValue.append(',');
+ }
+ }
+ params = fieldsValue.toString();
+ }
+ ContentExchange get = getContentExchange(HttpMethods.GET, sobjectsUrl(sObjectName + "/" + id + params));
+ // requires authorization token
+ setAccessToken(get);
+
+ doHttpRequest(get, new DelegatingClientCallback(callback));
+ }
+
+ @Override
+ public void createSObject(String sObjectName, InputStream sObject,
+ ResponseCallback callback) {
+ // post the sObject
+ final ContentExchange post = getContentExchange(HttpMethods.POST, sobjectsUrl(sObjectName));
+
+ // authorization
+ setAccessToken(post);
+
+ // input stream as entity content
+ post.setRequestContentSource(sObject);
+ post.setRequestContentType("json".equals(format) ? APPLICATION_JSON_UTF8 : APPLICATION_XML_UTF8);
+
+ doHttpRequest(post, new DelegatingClientCallback(callback));
+ }
+
+ @Override
+ public void updateSObject(String sObjectName, String id, InputStream sObject,
+ ResponseCallback callback) {
+ final ContentExchange patch = getContentExchange("PATCH", sobjectsUrl(sObjectName + "/" + id));
+ // requires authorization token
+ setAccessToken(patch);
+
+ // input stream as entity content
+ patch.setRequestContentSource(sObject);
+ patch.setRequestContentType("json".equals(format) ? APPLICATION_JSON_UTF8 : APPLICATION_XML_UTF8);
+
+ doHttpRequest(patch, new DelegatingClientCallback(callback));
+ }
+
+ @Override
+ public void deleteSObject(String sObjectName, String id,
+ ResponseCallback callback) {
+ final ContentExchange delete = getContentExchange(HttpMethods.DELETE, sobjectsUrl(sObjectName + "/" + id));
+
+ // requires authorization token
+ setAccessToken(delete);
+
+ doHttpRequest(delete, new DelegatingClientCallback(callback));
+ }
+
+ @Override
+ public void getSObjectWithId(String sObjectName, String fieldName, String fieldValue,
+ ResponseCallback callback) {
+ final ContentExchange get = getContentExchange(HttpMethods.GET,
+ sobjectsExternalIdUrl(sObjectName, fieldName, fieldValue));
+
+ // requires authorization token
+ setAccessToken(get);
+
+ doHttpRequest(get, new DelegatingClientCallback(callback));
+ }
+
+ @Override
+ public void upsertSObject(String sObjectName, String fieldName, String fieldValue, InputStream sObject,
+ ResponseCallback callback) {
+ final ContentExchange patch = getContentExchange("PATCH",
+ sobjectsExternalIdUrl(sObjectName, fieldName, fieldValue));
+
+ // requires authorization token
+ setAccessToken(patch);
+
+ // input stream as entity content
+ patch.setRequestContentSource(sObject);
+ // TODO will the encoding always be UTF-8??
+ patch.setRequestContentType("json".equals(format) ? APPLICATION_JSON_UTF8 : APPLICATION_XML_UTF8);
+
+ doHttpRequest(patch, new DelegatingClientCallback(callback));
+ }
+
+ @Override
+ public void deleteSObjectWithId(String sObjectName, String fieldName, String fieldValue,
+ ResponseCallback callback) {
+ final ContentExchange delete = getContentExchange(HttpMethods.DELETE,
+ sobjectsExternalIdUrl(sObjectName, fieldName, fieldValue));
+
+ // requires authorization token
+ setAccessToken(delete);
+
+ doHttpRequest(delete, new DelegatingClientCallback(callback));
+ }
+
+ @Override
+ public void getBlobField(String sObjectName, String id, String blobFieldName, ResponseCallback callback) {
+ final ContentExchange get = getContentExchange(HttpMethods.GET,
+ sobjectsUrl(sObjectName + "/" + id +"/" + blobFieldName));
+ // TODO this doesn't seem to be required, the response is always the content binary stream
+ //get.setRequestHeader(HttpHeaders.ACCEPT_ENCODING, "base64");
+
+ // requires authorization token
+ setAccessToken(get);
+
+ doHttpRequest(get, new DelegatingClientCallback(callback));
+ }
+
+ @Override
+ public void query(String soqlQuery, ResponseCallback callback) {
+ try {
+
+ String encodedQuery = URLEncoder.encode(soqlQuery, StringUtil.__UTF8_CHARSET.toString());
+ // URLEncoder likes to use '+' for spaces
+ encodedQuery = encodedQuery.replace("+", "%20");
+ final ContentExchange get = getContentExchange(HttpMethods.GET, versionUrl() + "query/?q=" + encodedQuery);
+
+ // requires authorization token
+ setAccessToken(get);
+
+ doHttpRequest(get, new DelegatingClientCallback(callback));
+
+ } catch (UnsupportedEncodingException e) {
+ String msg = "Unexpected error: " + e.getMessage();
+ callback.onResponse(null, new SalesforceException(msg, e));
+ }
+ }
+
+ @Override
+ public void queryMore(String nextRecordsUrl, ResponseCallback callback) {
+ final ContentExchange get = getContentExchange(HttpMethods.GET, instanceUrl + nextRecordsUrl);
+
+ // requires authorization token
+ setAccessToken(get);
+
+ doHttpRequest(get, new DelegatingClientCallback(callback));
+ }
+
+ @Override
+ public void search(String soslQuery, ResponseCallback callback) {
+ try {
+
+ String encodedQuery = URLEncoder.encode(soslQuery, StringUtil.__UTF8_CHARSET.toString());
+ // URLEncoder likes to use '+' for spaces
+ encodedQuery = encodedQuery.replace("+", "%20");
+ final ContentExchange get = getContentExchange(HttpMethods.GET, versionUrl() + "search/?q=" + encodedQuery);
+
+ // requires authorization token
+ setAccessToken(get);
+
+ doHttpRequest(get, new DelegatingClientCallback(callback));
+
+ } catch (UnsupportedEncodingException e) {
+ String msg = "Unexpected error: " + e.getMessage();
+ callback.onResponse(null, new SalesforceException(msg, e));
+ }
+ }
+
+ private String servicesDataUrl() {
+ return instanceUrl + SERVICES_DATA;
+ }
+
+ private String versionUrl() {
+ if (version == null) {
+ throw new IllegalArgumentException("NULL API version", new NullPointerException("version"));
+ }
+ return servicesDataUrl() + "v" + version + "/";
+ }
+
+ private String sobjectsUrl(String sObjectName) {
+ if (sObjectName == null) {
+ throw new IllegalArgumentException("Null SObject name", new NullPointerException("sObjectName"));
+ }
+ return versionUrl() + "sobjects/" + sObjectName;
+ }
+
+ private String sobjectsExternalIdUrl(String sObjectName, String fieldName, String fieldValue) {
+ if (fieldName == null || fieldValue == null) {
+ throw new IllegalArgumentException("External field name and value cannot be NULL");
+ }
+ try {
+ String encodedValue = URLEncoder.encode(fieldValue, StringUtil.__UTF8_CHARSET.toString());
+ // URLEncoder likes to use '+' for spaces
+ encodedValue = encodedValue.replace("+", "%20");
+ return sobjectsUrl(sObjectName + "/" + fieldName + "/" + encodedValue);
+ } catch (UnsupportedEncodingException e) {
+ String msg = "Unexpected error: " + e.getMessage();
+ throw new IllegalArgumentException(msg, e);
+ }
+ }
+
+ protected void setAccessToken(HttpExchange httpExchange) {
+ httpExchange.setRequestHeader(TOKEN_HEADER, TOKEN_PREFIX + accessToken);
+ }
+
+ private static class DelegatingClientCallback implements ClientResponseCallback {
+ private final ResponseCallback callback;
+
+ public DelegatingClientCallback(ResponseCallback callback) {
+ this.callback = callback;
+ }
+
+ @Override
+ public void onResponse(InputStream response, SalesforceException ex) {
+ callback.onResponse(response, ex);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/RestClient.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/RestClient.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/RestClient.java
new file mode 100644
index 0000000..30186d9
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/RestClient.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.client;
+
+import org.apache.camel.component.salesforce.api.SalesforceException;
+
+import java.io.InputStream;
+
+public interface RestClient {
+
+ public static interface ResponseCallback {
+ void onResponse(InputStream response, SalesforceException exception);
+ }
+
+ /**
+ * Lists summary information about each API version currently available,
+ * including the version, label, and a link to each version's root.
+ *
+ * @param callback {@link ResponseCallback} to handle response or exception
+ */
+ void getVersions(ResponseCallback callback);
+
+ /**
+ * Lists available resources for the specified API version, including resource name and URI.
+ *
+ * @param callback {@link ResponseCallback} to handle response or exception
+ */
+ void getResources(ResponseCallback callback);
+
+ /**
+ * Lists the available objects and their metadata for your organization's data.
+ *
+ * @param callback {@link ResponseCallback} to handle response or exception
+ */
+ void getGlobalObjects(ResponseCallback callback);
+
+ /**
+ * Describes the individual metadata for the specified object.
+ *
+ * @param sObjectName specified object name
+ * @param callback {@link ResponseCallback} to handle response or exception
+ */
+ void getBasicInfo(String sObjectName, ResponseCallback callback);
+
+ /**
+ * Completely describes the individual metadata at all levels for the specified object.
+ *
+ * @param sObjectName specified object name
+ * @param callback {@link ResponseCallback} to handle response or exception
+ */
+ void getDescription(String sObjectName, ResponseCallback callback);
+
+ /**
+ * Retrieves a record for the specified object ID.
+ *
+ * @param sObjectName specified object name
+ * @param id object id
+ * @param callback {@link ResponseCallback} to handle response or exception
+ */
+ void getSObject(String sObjectName, String id, String[] fields, ResponseCallback callback);
+
+ /**
+ * Creates a record for the specified object.
+ *
+ * @param sObjectName specified object name
+ * @param sObject request entity
+ * @param callback {@link ResponseCallback} to handle response or exception
+ */
+ void createSObject(String sObjectName, InputStream sObject, ResponseCallback callback);
+
+ /**
+ * Updates a record for the specified object ID.
+ *
+ * @param sObjectName specified object name
+ * @param id object id
+ * @param sObject request entity
+ * @param callback {@link ResponseCallback} to handle response or exception
+ */
+ void updateSObject(String sObjectName, String id, InputStream sObject, ResponseCallback callback);
+
+ /**
+ * Deletes a record for the specified object ID.
+ *
+ * @param sObjectName specified object name
+ * @param id object id
+ * @param callback {@link ResponseCallback} to handle response or exception
+ */
+ void deleteSObject(String sObjectName, String id, ResponseCallback callback);
+
+ /**
+ * Retrieves a record for the specified external ID.
+ *
+ * @param sObjectName specified object name
+ * @param fieldName external field name
+ * @param fieldValue external field value
+ * @param callback {@link ResponseCallback} to handle response or exception
+ */
+ void getSObjectWithId(String sObjectName, String fieldName, String fieldValue, ResponseCallback callback);
+
+ /**
+ * Creates or updates a record based on the value of a specified external ID field.
+ *
+ * @param sObjectName specified object name
+ * @param fieldName external field name
+ * @param fieldValue external field value
+ * @param sObject input object to insert or update
+ * @param callback {@link ResponseCallback} to handle response or exception
+ */
+ void upsertSObject(String sObjectName,
+ String fieldName, String fieldValue, InputStream sObject, ResponseCallback callback);
+
+ /**
+ * Deletes a record based on the value of a specified external ID field.
+ *
+ * @param sObjectName specified object name
+ * @param fieldName external field name
+ * @param fieldValue external field value
+ * @param callback {@link ResponseCallback} to handle response or exception
+ */
+ void deleteSObjectWithId(String sObjectName,
+ String fieldName, String fieldValue, ResponseCallback callback);
+
+
+ /**
+ * Retrieves the specified blob field from an individual record.
+ *
+ */
+ void getBlobField(String sObjectName, String id, String blobFieldName, ResponseCallback callback);
+
+/*
+ TODO
+ SObject User Password
+ /vXX.X/sobjects/User/user id/password
+ /vXX.X/sobjects/SelfServiceUser/self service user id/password
+
+ These methods set, reset, or get information about a user password.
+*/
+
+ /**
+ * Executes the specified SOQL query.
+ *
+ * @param soqlQuery SOQL query
+ * @param callback {@link ResponseCallback} to handle response or exception
+ */
+ void query(String soqlQuery, ResponseCallback callback);
+
+ /**
+ * Get SOQL query results using nextRecordsUrl.
+ *
+ * @param nextRecordsUrl URL for next records to fetch, returned by query()
+ * @param callback {@link ResponseCallback} to handle response or exception
+ */
+ void queryMore(String nextRecordsUrl, ResponseCallback callback);
+
+ /**
+ * Executes the specified SOSL search.
+ *
+ * @param soslQuery SOSL query
+ * @param callback {@link ResponseCallback} to handle response or exception
+ */
+ void search(String soslQuery, ResponseCallback callback);
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceExchange.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceExchange.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceExchange.java
new file mode 100644
index 0000000..b17c5e1
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceExchange.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.client;
+
+import org.eclipse.jetty.client.ContentExchange;
+
+/**
+ * Wraps a Salesforce Http Exchange
+ */
+public class SalesforceExchange extends ContentExchange {
+
+ private AbstractClientBase client;
+
+ public AbstractClientBase getClient() {
+ return client;
+ }
+
+ public void setClient(AbstractClientBase client) {
+ this.client = client;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityListener.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityListener.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityListener.java
new file mode 100644
index 0000000..5eec212
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityListener.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.client;
+
+import org.eclipse.jetty.client.HttpDestination;
+import org.eclipse.jetty.client.HttpEventListenerWrapper;
+import org.eclipse.jetty.client.HttpExchange;
+import org.eclipse.jetty.http.HttpHeaders;
+import org.eclipse.jetty.http.HttpStatus;
+import org.eclipse.jetty.io.Buffer;
+import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.component.salesforce.internal.SalesforceSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class SalesforceSecurityListener extends HttpEventListenerWrapper {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SalesforceSecurityListener.class);
+
+ private final HttpDestination destination;
+ private final HttpExchange exchange;
+ private final SalesforceSession session;
+
+ private String currentToken;
+ private int retries;
+ private boolean retrying;
+ private boolean requestComplete;
+ private boolean responseComplete;
+
+ public SalesforceSecurityListener(HttpDestination destination, HttpExchange exchange,
+ SalesforceSession session, String accessToken) {
+ super(exchange.getEventListener(), true);
+ this.destination = destination;
+ this.exchange = exchange;
+ this.session = session;
+ this.currentToken = accessToken;
+ }
+
+ @Override
+ public void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException {
+ if (status == HttpStatus.UNAUTHORIZED_401 &&
+ retries < destination.getHttpClient().maxRetries()) {
+
+ LOG.warn("Retrying on Salesforce authentication error [{}]: [{}]", status, reason);
+ setDelegatingRequests(false);
+ setDelegatingResponses(false);
+
+ retrying = true;
+ }
+ super.onResponseStatus(version, status, reason);
+ }
+
+ @Override
+ public void onRequestComplete() throws IOException {
+ requestComplete = true;
+ if (checkExchangeComplete()) {
+ super.onRequestComplete();
+ }
+ }
+
+ @Override
+ public void onResponseComplete() throws IOException {
+ responseComplete = true;
+ if (checkExchangeComplete()) {
+ super.onResponseComplete();
+ }
+ }
+
+ private boolean checkExchangeComplete() throws IOException {
+ if (retrying && requestComplete && responseComplete) {
+ LOG.debug("Authentication Error, retrying: {}", exchange);
+
+ requestComplete = false;
+ responseComplete = false;
+
+ setDelegatingRequests(true);
+ setDelegatingResponses(true);
+
+ try {
+ // get a new token and retry
+ currentToken = session.login(currentToken);
+
+ if (exchange instanceof SalesforceExchange) {
+ final SalesforceExchange salesforceExchange = (SalesforceExchange) exchange;
+ final AbstractClientBase client = salesforceExchange.getClient();
+
+ // update client cache for this and future requests
+ client.setAccessToken(currentToken);
+ client.setInstanceUrl(session.getInstanceUrl());
+ client.setAccessToken(exchange);
+ } else {
+ exchange.addRequestHeader(HttpHeaders.AUTHORIZATION,
+ "OAuth " + currentToken);
+ }
+
+ // TODO handle a change in Salesforce instanceUrl, right now we retry with the same destination
+ destination.resend(exchange);
+
+ // resending, exchange is not done
+ return false;
+
+ } catch (SalesforceException e) {
+ // logging here, since login exception is not propagated!
+ LOG.error(e.getMessage(), e);
+
+ // the HTTP status and reason is pushed up
+ setDelegationResult(false);
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public void onRetry() {
+ // ignore retries from other interceptors
+ if (retrying) {
+ retrying = false;
+ retries++;
+
+ setDelegatingRequests(true);
+ setDelegatingResponses(true);
+
+ requestComplete = false;
+ responseComplete = false;
+ }
+ super.onRetry();
+ }
+
+ @Override
+ public void onConnectionFailed(Throwable ex) {
+ setDelegatingRequests(true);
+ setDelegatingResponses(true);
+ // delegate connection failures
+ super.onConnectionFailed(ex);
+ }
+
+ @Override
+ public void onException(Throwable ex) {
+ setDelegatingRequests(true);
+ setDelegatingResponses(true);
+ // delegate exceptions
+ super.onException(ex);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SyncResponseCallback.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SyncResponseCallback.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SyncResponseCallback.java
new file mode 100644
index 0000000..0f567e6
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SyncResponseCallback.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.client;
+
+import org.apache.camel.component.salesforce.api.SalesforceException;
+
+import java.io.InputStream;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Thin wrapper to handle callbacks for {@link RestClient.ResponseCallback} and allow waiting for results
+ */
+public class SyncResponseCallback implements RestClient.ResponseCallback {
+
+ private InputStream response;
+ private SalesforceException exception;
+ private CountDownLatch latch = new CountDownLatch(1);
+
+ @Override
+ public void onResponse(InputStream response, SalesforceException exception) {
+ this.response = response;
+ this.exception = exception;
+ latch.countDown();
+ }
+
+ public void reset() {
+ latch = new CountDownLatch(1);
+ }
+
+ public boolean await(long duration, TimeUnit unit) throws InterruptedException {
+ return latch.await(duration, unit);
+ }
+
+ public InputStream getResponse() {
+ return response;
+ }
+
+ public SalesforceException getException() {
+ return exception;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/LoginError.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/LoginError.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/LoginError.java
new file mode 100644
index 0000000..cec3ef8
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/LoginError.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.dto;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * DTO for Salesforce login error
+ */
+public class LoginError {
+
+ private String error;
+
+ private String errorDescription;
+
+ public String getError() {
+ return error;
+ }
+
+ public void setError(String error) {
+ this.error = error;
+ }
+
+ @JsonProperty("error_description")
+ public String getErrorDescription() {
+ return errorDescription;
+ }
+
+ @JsonProperty("error_description")
+ public void setErrorDescription(String errorDescription) {
+ this.errorDescription = errorDescription;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/LoginToken.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/LoginToken.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/LoginToken.java
new file mode 100644
index 0000000..c23338e
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/LoginToken.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.dto;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * DTO for Salesforce login
+ */
+public class LoginToken {
+
+ private String accessToken;
+
+ private String instanceUrl;
+
+ private String id;
+
+ private String signature;
+
+ private String issuedAt;
+
+ @JsonProperty("access_token")
+ public String getAccessToken() {
+ return accessToken;
+ }
+
+ @JsonProperty("access_token")
+ public void setAccessToken(String accessToken) {
+ this.accessToken = accessToken;
+ }
+
+ @JsonProperty("instance_url")
+ public String getInstanceUrl() {
+ return instanceUrl;
+ }
+
+ @JsonProperty("instance_url")
+ public void setInstanceUrl(String instanceUrl) {
+ this.instanceUrl = instanceUrl;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getSignature() {
+ return signature;
+ }
+
+ public void setSignature(String signature) {
+ this.signature = signature;
+ }
+
+ @JsonProperty("issued_at")
+ public String getIssuedAt() {
+ return issuedAt;
+ }
+
+ @JsonProperty("issued_at")
+ public void setIssuedAt(String issuedAt) {
+ this.issuedAt = issuedAt;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/72a1767e/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/NotifyForFieldsEnum.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/NotifyForFieldsEnum.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/NotifyForFieldsEnum.java
new file mode 100644
index 0000000..970b9aa
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/dto/NotifyForFieldsEnum.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.dto;
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonValue;
+
+/**
+ * Salesforce Enumeration DTO for picklist NotifyForFields
+ */
+public enum NotifyForFieldsEnum {
+
+ SELECT("Select"),
+ WHERE("Where"),
+ REFERENCED("Referenced"),
+ ALL("All");
+
+ final String value;
+
+ private NotifyForFieldsEnum(String value) {
+ this.value = value;
+ }
+
+ @JsonValue
+ public String value() {
+ return this.value;
+ }
+
+ @JsonCreator
+ public static NotifyForFieldsEnum fromValue(String value) {
+ for (NotifyForFieldsEnum e : NotifyForFieldsEnum.values()) {
+ if (e.value.equals(value)) {
+ return e;
+ }
+ }
+ throw new IllegalArgumentException(value);
+ }
+
+}
\ No newline at end of file