You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/06/05 17:12:50 UTC

[06/11] CAMEL-6428: camel-salesforce component. Thanks to Dhiraj Bokde for the contribution.

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/PushTopicHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/PushTopicHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/PushTopicHelper.java
new file mode 100644
index 0000000..4e51281
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/PushTopicHelper.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.streaming;
+
+import org.apache.camel.CamelException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.eclipse.jetty.http.HttpStatus;
+import org.apache.camel.component.salesforce.SalesforceEndpointConfig;
+import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.component.salesforce.api.dto.CreateSObjectResult;
+import org.apache.camel.component.salesforce.internal.client.RestClient;
+import org.apache.camel.component.salesforce.internal.client.SyncResponseCallback;
+import org.apache.camel.component.salesforce.internal.dto.PushTopic;
+import org.apache.camel.component.salesforce.internal.dto.QueryRecordsPushTopic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+public class PushTopicHelper {
+    private static final Logger LOG = LoggerFactory.getLogger(PushTopicHelper.class);
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+    private static final String PUSH_TOPIC_OBJECT_NAME = "PushTopic";
+    private static final long API_TIMEOUT = 60; // Rest API call timeout
+    private final SalesforceEndpointConfig config;
+    private final String topicName;
+    private final RestClient restClient;
+
+    public PushTopicHelper(SalesforceEndpointConfig config, String topicName, RestClient restClient) {
+        this.config = config;
+        this.topicName = topicName;
+        this.restClient = restClient;
+    }
+
+    public void createOrUpdateTopic() throws CamelException {
+        final String query = config.getSObjectQuery();
+
+        final SyncResponseCallback callback = new SyncResponseCallback();
+        // lookup Topic first
+        try {
+            // use SOQL to lookup Topic, since Name is not an external ID!!!
+            restClient.query("SELECT Id, Name, Query, ApiVersion, IsActive, " +
+                "NotifyForFields, NotifyForOperations, Description " +
+                "FROM PushTopic WHERE Name = '" + topicName + "'",
+                callback);
+
+            if (!callback.await(API_TIMEOUT, TimeUnit.SECONDS)) {
+                throw new SalesforceException("API call timeout!", null);
+            }
+            if (callback.getException() != null) {
+                throw callback.getException();
+            }
+            QueryRecordsPushTopic records = objectMapper.readValue(callback.getResponse(),
+                QueryRecordsPushTopic.class);
+            if (records.getTotalSize() == 1) {
+
+                PushTopic topic = records.getRecords().get(0);
+                LOG.info("Found existing topic {}: {}", topicName, topic);
+
+                // check if we need to update topic query, notifyForFields or notifyForOperations
+                if (!query.equals(topic.getQuery()) ||
+                    (config.getNotifyForFields() != null &&
+                        !config.getNotifyForFields().equals(topic.getNotifyForFields())) ||
+                    (config.getNotifyForOperations() != null &&
+                        !config.getNotifyForOperations().equals(topic.getNotifyForOperations()))
+                    ) {
+
+                    if (!config.isUpdateTopic()) {
+                        String msg = "Query doesn't match existing Topic and updateTopic is set to false";
+                        throw new CamelException(msg);
+                    }
+
+                    // otherwise update the topic
+                    updateTopic(topic.getId());
+                }
+
+            } else {
+                createTopic();
+            }
+
+        } catch (SalesforceException e) {
+            throw new CamelException(
+                String.format("Error retrieving Topic %s: %s", topicName, e.getMessage()),
+                e);
+        } catch (IOException e) {
+            throw new CamelException(
+                String.format("Un-marshaling error retrieving Topic %s: %s", topicName, e.getMessage()),
+                e);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new CamelException(
+                String.format("Un-marshaling error retrieving Topic %s: %s", topicName, e.getMessage()),
+                e);
+        } finally {
+            // close stream to close HttpConnection
+            if (callback.getResponse() != null) {
+                try {
+                    callback.getResponse().close();
+                } catch (IOException e) {
+                    // ignore
+                }
+            }
+        }
+    }
+
+    private void createTopic() throws CamelException {
+        final PushTopic topic = new PushTopic();
+        topic.setName(topicName);
+        topic.setApiVersion(Double.valueOf(config.getApiVersion()));
+        topic.setQuery(config.getSObjectQuery());
+        topic.setDescription("Topic created by Camel Salesforce component");
+        topic.setNotifyForFields(config.getNotifyForFields());
+        topic.setNotifyForOperations(config.getNotifyForOperations());
+
+        LOG.info("Creating Topic {}: {}", topicName, topic);
+        final SyncResponseCallback callback = new SyncResponseCallback();
+        try {
+            restClient.createSObject(PUSH_TOPIC_OBJECT_NAME,
+                new ByteArrayInputStream(objectMapper.writeValueAsBytes(topic)), callback);
+
+            if (!callback.await(API_TIMEOUT, TimeUnit.SECONDS)) {
+                throw new SalesforceException("API call timeout!", null);
+            }
+            if (callback.getException() != null) {
+                throw callback.getException();
+            }
+
+            CreateSObjectResult result = objectMapper.readValue(callback.getResponse(), CreateSObjectResult.class);
+            if (!result.getSuccess()) {
+                final SalesforceException salesforceException = new SalesforceException(
+                    result.getErrors(), HttpStatus.BAD_REQUEST_400);
+                throw new CamelException(
+                    String.format("Error creating Topic %s: %s", topicName, result.getErrors()),
+                    salesforceException);
+            }
+        } catch (SalesforceException e) {
+            throw new CamelException(
+                String.format("Error creating Topic %s: %s", topicName, e.getMessage()),
+                e);
+        } catch (IOException e) {
+            throw new CamelException(
+                String.format("Un-marshaling error creating Topic %s: %s", topicName, e.getMessage()),
+                e);
+        } catch (InterruptedException e) {
+            throw new CamelException(
+                String.format("Un-marshaling error creating Topic %s: %s", topicName, e.getMessage()),
+                e);
+        } finally {
+            if (callback.getResponse() != null) {
+                try {
+                    callback.getResponse().close();
+                } catch (IOException e) {
+                    // ignore
+                }
+            }
+        }
+    }
+
+    private void updateTopic(String topicId) throws CamelException {
+        final String query = config.getSObjectQuery();
+        LOG.info("Updating Topic {} with Query [{}]", topicName, query);
+
+        final SyncResponseCallback callback = new SyncResponseCallback();
+        try {
+            // update the query, notifyForFields and notifyForOperations fields
+            final PushTopic topic = new PushTopic();
+            topic.setQuery(query);
+            topic.setNotifyForFields(config.getNotifyForFields());
+            topic.setNotifyForOperations(config.getNotifyForOperations());
+
+            restClient.updateSObject("PushTopic", topicId,
+                new ByteArrayInputStream(objectMapper.writeValueAsBytes(topic)),
+                callback);
+
+            if (!callback.await(API_TIMEOUT, TimeUnit.SECONDS)) {
+                throw new SalesforceException("API call timeout!", null);
+            }
+            if (callback.getException() != null) {
+                throw callback.getException();
+            }
+
+        } catch (SalesforceException e) {
+            throw new CamelException(
+                String.format("Error updating topic %s with query [%s] : %s", topicName, query, e.getMessage()),
+                e);
+        } catch (InterruptedException e) {
+            // reset interrupt status
+            Thread.currentThread().interrupt();
+            throw new CamelException(
+                String.format("Error updating topic %s with query [%s] : %s", topicName, query, e.getMessage()),
+                e);
+        } catch (IOException e) {
+            throw new CamelException(
+                String.format("Error updating topic %s with query [%s] : %s", topicName, query, e.getMessage()),
+                e);
+        } finally {
+            if (callback.getResponse() != null) {
+                try {
+                    callback.getResponse().close();
+                } catch (IOException ignore) {
+                }
+            }
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
new file mode 100644
index 0000000..b3bd50f
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.streaming;
+
+import org.apache.camel.CamelException;
+import org.apache.camel.Service;
+import org.cometd.bayeux.Message;
+import org.cometd.bayeux.client.ClientSessionChannel;
+import org.cometd.client.BayeuxClient;
+import org.cometd.client.transport.ClientTransport;
+import org.cometd.client.transport.LongPollingTransport;
+import org.eclipse.jetty.client.ContentExchange;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.http.HttpHeaders;
+import org.eclipse.jetty.http.HttpSchemes;
+import org.apache.camel.component.salesforce.SalesforceComponent;
+import org.apache.camel.component.salesforce.SalesforceConsumer;
+import org.apache.camel.component.salesforce.internal.SalesforceSession;
+import org.apache.camel.component.salesforce.internal.client.SalesforceSecurityListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.cometd.bayeux.Channel.*;
+import static org.cometd.bayeux.Message.ERROR_FIELD;
+import static org.cometd.bayeux.Message.SUBSCRIPTION_FIELD;
+
+public class SubscriptionHelper implements Service {
+    private static final Logger LOG = LoggerFactory.getLogger(SubscriptionHelper.class);
+
+    private static final int CONNECT_TIMEOUT = 110;
+    private static final int CHANNEL_TIMEOUT = 40;
+
+    private static final String EXCEPTION_FIELD = "exception";
+
+    private final SalesforceComponent component;
+    private final SalesforceSession session;
+    private final BayeuxClient client;
+
+    private final Map<SalesforceConsumer, ClientSessionChannel.MessageListener> listenerMap;
+
+    private boolean started;
+    private ClientSessionChannel.MessageListener handshakeListener;
+    private ClientSessionChannel.MessageListener connectListener;
+
+    private String handshakeError;
+    private Exception handshakeException;
+    private String connectError;
+    private boolean reconnecting;
+
+    public SubscriptionHelper(SalesforceComponent component) throws Exception {
+        this.component = component;
+        this.session = component.getSession();
+
+        this.listenerMap = new ConcurrentHashMap<SalesforceConsumer, ClientSessionChannel.MessageListener>();
+
+        // create CometD client
+        this.client = createClient();
+    }
+
+    @Override
+    public void start() throws Exception {
+        if (started) {
+            // no need to start again
+            return;
+        }
+
+        // listener for handshake error or exception
+        if (handshakeListener == null) {
+            // first start
+            handshakeListener = new ClientSessionChannel.MessageListener() {
+                public void onMessage(ClientSessionChannel channel, Message message) {
+                    LOG.debug("[CHANNEL:META_HANDSHAKE]: {}", message);
+
+                    if (!message.isSuccessful()) {
+                        String error = (String) message.get(ERROR_FIELD);
+                        if (error != null) {
+                            handshakeError = error;
+                        }
+                        Exception exception = (Exception) message.get(EXCEPTION_FIELD);
+                        if (exception != null) {
+                            handshakeException = exception;
+                        }
+                    } else if (!listenerMap.isEmpty()) {
+                        reconnecting = true;
+                    }
+                }
+            };
+        }
+        client.getChannel(META_HANDSHAKE).addListener(handshakeListener);
+
+        // listener for connect error
+        if (connectListener == null) {
+            connectListener = new ClientSessionChannel.MessageListener() {
+                public void onMessage(ClientSessionChannel channel, Message message) {
+                    LOG.debug("[CHANNEL:META_CONNECT]: {}", message);
+
+                    if (!message.isSuccessful()) {
+                        String error = (String) message.get(ERROR_FIELD);
+                        if (error != null) {
+                            connectError = error;
+                        }
+                    } else if (reconnecting) {
+
+                        reconnecting = false;
+
+                        LOG.debug("Refreshing subscriptions to {} channels on reconnect", listenerMap.size());
+                        // reconnected to Salesforce, subscribe to existing channels
+                        final Map<SalesforceConsumer, ClientSessionChannel.MessageListener> map =
+                            new HashMap<SalesforceConsumer, ClientSessionChannel.MessageListener>();
+                        map.putAll(listenerMap);
+                        listenerMap.clear();
+                        for (Map.Entry<SalesforceConsumer, ClientSessionChannel.MessageListener> entry :
+                            map.entrySet()) {
+                            final SalesforceConsumer consumer = entry.getKey();
+                            final String topicName = consumer.getTopicName();
+                            try {
+                                subscribe(topicName, consumer);
+                            } catch (CamelException e) {
+                                // let the consumer handle the exception
+                                consumer.handleException(
+                                    String.format("Error refreshing subscription to topic [%s]: %s",
+                                        topicName, e.getMessage()),
+                                    e);
+                            }
+                        }
+
+                    }
+                }
+            };
+        }
+        client.getChannel(META_CONNECT).addListener(connectListener);
+
+        // connect to Salesforce cometd endpoint
+        client.handshake();
+
+        final long waitMs = MILLISECONDS.convert(CONNECT_TIMEOUT, SECONDS);
+        if (!client.waitFor(waitMs, BayeuxClient.State.CONNECTED)) {
+            if (handshakeException != null) {
+                throw new CamelException(
+                    String.format("Exception during HANDSHAKE: %s", handshakeException.getMessage()),
+                    handshakeException);
+            } else if (handshakeError != null) {
+                throw new CamelException(String.format("Error during HANDSHAKE: %s", handshakeError));
+            } else if (connectError != null) {
+                throw new CamelException(String.format("Error during CONNECT: %s", connectError));
+            } else {
+                throw new CamelException(
+                    String.format("Handshake request timeout after %s seconds", CONNECT_TIMEOUT));
+            }
+        }
+
+        started = true;
+    }
+
+    @Override
+    public void stop() {
+        if (started) {
+            started = false;
+            // TODO find and log any disconnect errors
+            client.disconnect();
+            client.getChannel(META_CONNECT).removeListener(connectListener);
+            client.getChannel(META_HANDSHAKE).removeListener(handshakeListener);
+        }
+    }
+
+    private BayeuxClient createClient() throws Exception {
+        // use default Jetty client from SalesforceComponent, its shared by all consumers
+        final HttpClient httpClient = component.getConfig().getHttpClient();
+
+        Map<String, Object> options = new HashMap<String, Object>();
+        options.put(ClientTransport.TIMEOUT_OPTION, httpClient.getTimeout());
+
+        // check login access token
+        if (session.getAccessToken() == null) {
+            // lazy login here!
+            session.login(null);
+        }
+
+        LongPollingTransport transport = new LongPollingTransport(options, httpClient) {
+            @Override
+            protected void customize(ContentExchange exchange) {
+                super.customize(exchange);
+                // add SalesforceSecurityListener to handle token expiry
+                final String accessToken = session.getAccessToken();
+                try {
+                    final boolean isHttps = HttpSchemes.HTTPS.equals(String.valueOf(exchange.getScheme()));
+                    exchange.setEventListener(new SalesforceSecurityListener(
+                        httpClient.getDestination(exchange.getAddress(), isHttps),
+                        exchange, session, accessToken));
+                } catch (IOException e) {
+                    throw new RuntimeException(
+                        String.format("Error adding SalesforceSecurityListener to exchange %s", e.getMessage()),
+                        e);
+                }
+
+                // add current security token obtained from session
+                exchange.addRequestHeader(HttpHeaders.AUTHORIZATION,
+                "OAuth " + accessToken);
+            }
+        };
+
+        BayeuxClient client = new BayeuxClient(getEndpointUrl(), transport);
+
+        client.setDebugEnabled(false);
+
+        return client;
+    }
+
+    public void subscribe(final String topicName, final SalesforceConsumer consumer) throws CamelException {
+        // create subscription for consumer
+        final String channelName = getChannelName(topicName);
+
+        // channel message listener
+        LOG.info("Subscribing to channel {}...", channelName);
+        final ClientSessionChannel.MessageListener listener = new ClientSessionChannel.MessageListener() {
+
+            @Override
+            public void onMessage(ClientSessionChannel channel, Message message) {
+                LOG.debug("Received Message: {}", message);
+                // convert CometD message to Camel Message
+                consumer.processMessage(channel, message);
+            }
+
+        };
+
+        final ClientSessionChannel clientChannel = client.getChannel(channelName);
+
+        // listener for subscribe error
+        final CountDownLatch latch = new CountDownLatch(1);
+        final String[] subscribeError = {null};
+        final ClientSessionChannel.MessageListener subscriptionListener = new ClientSessionChannel.MessageListener() {
+            public void onMessage(ClientSessionChannel channel, Message message) {
+                LOG.debug("[CHANNEL:META_SUBSCRIBE]: {}", message);
+                final String subscribedChannelName = message.get(SUBSCRIPTION_FIELD).toString();
+                if (channelName.equals(subscribedChannelName)) {
+
+                    if (!message.isSuccessful()) {
+                        String error = (String) message.get(ERROR_FIELD);
+                        if (error != null) {
+                            subscribeError[0] = error;
+                        }
+                    } else {
+                        // remember subscription
+                        LOG.info("Subscribed to channel {}", subscribedChannelName);
+                    }
+                    latch.countDown();
+                }
+            }
+        };
+        client.getChannel(META_SUBSCRIBE).addListener(subscriptionListener);
+
+        try {
+            clientChannel.subscribe(listener);
+
+            // confirm that a subscription was created
+            try {
+                if (!latch.await(CHANNEL_TIMEOUT, SECONDS)) {
+                    String message;
+                    if (subscribeError[0] != null) {
+                        message = String.format("Error subscribing to topic %s: %s",
+                            topicName, subscribeError[0]);
+                    } else {
+                        message = String.format("Timeout error subscribing to topic %s after %s seconds",
+                            topicName, CHANNEL_TIMEOUT);
+                    }
+                    throw new CamelException(message);
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                // probably shutting down, so forget subscription
+            }
+
+            listenerMap.put(consumer, listener);
+
+        } finally {
+            client.getChannel(META_SUBSCRIBE).removeListener(subscriptionListener);
+        }
+    }
+
+    private String getChannelName(String topicName) {
+        return "/topic/" + topicName;
+    }
+
+    public void unsubscribe(String topicName, SalesforceConsumer consumer) throws CamelException {
+
+        // channel name
+        final String channelName = getChannelName(topicName);
+
+        // listen for unsubscribe error
+        final CountDownLatch latch = new CountDownLatch(1);
+        final String[] unsubscribeError = {null};
+        final ClientSessionChannel.MessageListener unsubscribeListener = new ClientSessionChannel.MessageListener() {
+            public void onMessage(ClientSessionChannel channel, Message message) {
+                LOG.debug("[CHANNEL:META_UNSUBSCRIBE]: {}", message);
+                String unsubscribedChannelName = message.get(SUBSCRIPTION_FIELD).toString();
+                if (channelName.equals(unsubscribedChannelName)) {
+
+                    if (!message.isSuccessful()) {
+                        String error = (String) message.get(ERROR_FIELD);
+                        if (error != null) {
+                            unsubscribeError[0] = error;
+                        }
+                    } else {
+                        // forget subscription
+                        LOG.info("Unsubscribed from channel {}", unsubscribedChannelName);
+                    }
+                    latch.countDown();
+                }
+            }
+        };
+        client.getChannel(META_UNSUBSCRIBE).addListener(unsubscribeListener);
+
+        try {
+            // unsubscribe from channel
+            final ClientSessionChannel.MessageListener listener = listenerMap.remove(consumer);
+            if (listener != null) {
+
+                LOG.info("Unsubscribing from channel {}...", channelName);
+                final ClientSessionChannel clientChannel = client.getChannel(channelName);
+                clientChannel.unsubscribe(listener);
+
+                // confirm unsubscribe
+                try {
+                    if (!latch.await(CHANNEL_TIMEOUT, SECONDS)) {
+                        String message;
+                        if (unsubscribeError[0] != null) {
+                            message = String.format("Error unsubscribing from topic %s: %s",
+                                topicName, unsubscribeError[0]);
+                        } else {
+                            message = String.format("Timeout error unsubscribing from topic %s after %s seconds",
+                                topicName, CHANNEL_TIMEOUT);
+                        }
+                        throw new CamelException(message);
+                    }
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    // probably shutting down, forget unsubscribe and return
+                }
+
+            }
+        } finally {
+            client.getChannel(META_UNSUBSCRIBE).removeListener(unsubscribeListener);
+        }
+    }
+
+    public String getEndpointUrl() {
+        return component.getSession().getInstanceUrl() + "/cometd/" + component.getConfig().getApiVersion();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/main/resources/META-INF/services/org/apache/camel/component/salesforce
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/resources/META-INF/services/org/apache/camel/component/salesforce b/components/camel-salesforce/camel-salesforce-component/src/main/resources/META-INF/services/org/apache/camel/component/salesforce
new file mode 100644
index 0000000..ad8d689
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/resources/META-INF/services/org/apache/camel/component/salesforce
@@ -0,0 +1 @@
+class=org.apache.camel.component.salesforce.SalesforceComponent

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/AbstractBulkApiTestBase.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/AbstractBulkApiTestBase.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/AbstractBulkApiTestBase.java
new file mode 100644
index 0000000..c48d143
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/AbstractBulkApiTestBase.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.salesforce.api.dto.bulk.BatchInfo;
+import org.apache.camel.component.salesforce.api.dto.bulk.BatchStateEnum;
+import org.apache.camel.component.salesforce.api.dto.bulk.JobInfo;
+import org.junit.experimental.theories.Theories;
+import org.junit.runner.RunWith;
+
+@RunWith(Theories.class)
+public abstract class AbstractBulkApiTestBase extends AbstractSalesforceTestBase {
+
+    protected JobInfo createJob(JobInfo jobInfo) throws InterruptedException {
+        jobInfo = template().requestBody("direct:createJob", jobInfo, JobInfo.class);
+        assertNotNull("Missing JobId", jobInfo.getId());
+        return jobInfo;
+    }
+
+    @Override
+    protected RouteBuilder doCreateRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // test createJob
+                from("direct:createJob").
+                    to("salesforce://createJob");
+
+                // test getJob
+                from("direct:getJob").
+                    to("salesforce:getJob");
+
+                // test closeJob
+                from("direct:closeJob").
+                    to("salesforce:closeJob");
+
+                // test abortJob
+                from("direct:abortJob").
+                    to("salesforce:abortJob");
+
+                // test createBatch
+                from("direct:createBatch").
+                    to("salesforce:createBatch");
+
+                // test getBatch
+                from("direct:getBatch").
+                    to("salesforce:getBatch");
+
+                // test getAllBatches
+                from("direct:getAllBatches").
+                    to("salesforce:getAllBatches");
+
+                // test getRequest
+                from("direct:getRequest").
+                    to("salesforce:getRequest");
+
+                // test getResults
+                from("direct:getResults").
+                    to("salesforce:getResults");
+
+                // test createBatchQuery
+                from("direct:createBatchQuery").
+                    to("salesforce:createBatchQuery?sObjectQuery=SELECT Name, Description__c, Price__c, Total_Inventory__c FROM Merchandise__c WHERE Name LIKE '%25Bulk API%25'");
+
+                // test getQueryResultIds
+                from("direct:getQueryResultIds").
+                    to("salesforce:getQueryResultIds");
+
+                // test getQueryResult
+                from("direct:getQueryResult").
+                    to("salesforce:getQueryResult");
+
+            }
+        };
+    }
+
+    protected boolean batchProcessed(BatchInfo batchInfo) {
+        BatchStateEnum state = batchInfo.getState();
+        return !(state == BatchStateEnum.QUEUED || state == BatchStateEnum.IN_PROGRESS);
+    }
+
+    protected BatchInfo getBatchInfo(BatchInfo batchInfo) throws InterruptedException {
+        batchInfo = template().requestBody("direct:getBatch", batchInfo, BatchInfo.class);
+
+        assertNotNull("Null batch", batchInfo);
+        assertNotNull("Null batch id", batchInfo.getId());
+
+        return batchInfo;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/AbstractSalesforceTestBase.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/AbstractSalesforceTestBase.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/AbstractSalesforceTestBase.java
new file mode 100644
index 0000000..3130a5b
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/AbstractSalesforceTestBase.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.camel.component.salesforce.dto.Merchandise__c;
+
+import java.io.IOException;
+
+public abstract class AbstractSalesforceTestBase extends CamelTestSupport {
+
+    @Override
+    public boolean isCreateCamelContextPerClass() {
+        // only create the context once for this class
+        return true;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        // create the test component
+        createComponent();
+
+        return doCreateRouteBuilder();
+    }
+
+    protected abstract RouteBuilder doCreateRouteBuilder() throws Exception;
+
+    protected void createComponent() throws IllegalAccessException, IOException {
+        // create the component
+        SalesforceComponent component = new SalesforceComponent();
+        final SalesforceEndpointConfig config = new SalesforceEndpointConfig();
+        config.setApiVersion(System.getProperty("apiVersion", SalesforceEndpointConfig.DEFAULT_VERSION));
+        component.setConfig(config);
+        component.setLoginConfig(LoginConfigHelper.getLoginConfig());
+
+        // set DTO package
+        component.setPackages(new String[] {
+            Merchandise__c.class.getPackage().getName()
+        });
+
+        // add it to context
+        context().addComponent("salesforce", component);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/BulkApiBatchIntegrationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/BulkApiBatchIntegrationTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/BulkApiBatchIntegrationTest.java
new file mode 100644
index 0000000..bed6a0c
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/BulkApiBatchIntegrationTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce;
+
+import org.apache.camel.component.salesforce.api.dto.bulk.*;
+import org.apache.camel.component.salesforce.dto.Merchandise__c;
+import org.junit.experimental.theories.DataPoints;
+import org.junit.experimental.theories.Theory;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class BulkApiBatchIntegrationTest extends AbstractBulkApiTestBase {
+    private static final String TEST_REQUEST_XML = "/test-request.xml";
+    private static final String TEST_REQUEST_CSV = "/test-request.csv";
+
+    @DataPoints
+    public static BatchTest[] getBatches() {
+        List<BatchTest> result = new ArrayList<BatchTest>();
+        BatchTest test = new BatchTest();
+        test.contentType = ContentType.XML;
+        test.stream = AbstractBulkApiTestBase.class.getResourceAsStream(TEST_REQUEST_XML);
+        result.add(test);
+
+        test = new BatchTest();
+        test.contentType = ContentType.CSV;
+        test.stream = AbstractBulkApiTestBase.class.getResourceAsStream(TEST_REQUEST_CSV);
+        result.add(test);
+
+        // TODO test ZIP_XML and ZIP_CSV
+        return result.toArray(new BatchTest[result.size()]);
+    }
+
+    @Theory
+    public void testBatchLifecycle(BatchTest request) throws Exception {
+        log.info("Testing Batch lifecycle with {} content", request.contentType);
+
+        // create an UPSERT test Job for this batch request
+        JobInfo jobInfo = new JobInfo();
+        jobInfo.setOperation(OperationEnum.UPSERT);
+        jobInfo.setContentType(request.contentType);
+        jobInfo.setObject(Merchandise__c.class.getSimpleName());
+        jobInfo.setExternalIdFieldName("Name");
+        jobInfo = createJob(jobInfo);
+
+        // test createBatch
+        Map<String, Object> headers = new HashMap<String, Object>();
+        headers.put(SalesforceEndpointConfig.JOB_ID, jobInfo.getId());
+        headers.put(SalesforceEndpointConfig.CONTENT_TYPE, jobInfo.getContentType());
+        BatchInfo batchInfo  = template().requestBodyAndHeaders("direct:createBatch",
+            request.stream, headers, BatchInfo.class);
+        assertNotNull("Null batch", batchInfo);
+        assertNotNull("Null batch id", batchInfo.getId());
+
+        // test getAllBatches
+        @SuppressWarnings("unchecked")
+        List<BatchInfo> batches = template().requestBody("direct:getAllBatches", jobInfo, List.class);
+        assertNotNull("Null batches", batches);
+        assertFalse("Empty batch list", batches.isEmpty());
+
+        // test getBatch
+        batchInfo = batches.get(0);
+        batchInfo = getBatchInfo(batchInfo);
+
+        // test getRequest
+        InputStream requestStream  = template().requestBody("direct:getRequest", batchInfo, InputStream.class);
+        assertNotNull("Null batch request", requestStream);
+
+        // wait for batch to finish
+        log.info("Waiting for batch to finish...");
+        while (!batchProcessed(batchInfo)) {
+            // sleep 5 seconds
+            Thread.sleep(5000);
+            // check again
+            batchInfo = getBatchInfo(batchInfo);
+        }
+        log.info("Batch finished with state " + batchInfo.getState());
+        assertEquals("Batch did not succeed", BatchStateEnum.COMPLETED, batchInfo.getState());
+
+        // test getResults
+        InputStream results  = template().requestBody("direct:getResults", batchInfo, InputStream.class);
+        assertNotNull("Null batch results", results);
+
+        // close the test job
+        template().requestBody("direct:closeJob", jobInfo, JobInfo.class);
+    }
+
+    private static class BatchTest {
+        public InputStream stream;
+        public ContentType contentType;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/BulkApiJobIntegrationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/BulkApiJobIntegrationTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/BulkApiJobIntegrationTest.java
new file mode 100644
index 0000000..9010aa4
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/BulkApiJobIntegrationTest.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce;
+
+import org.apache.camel.component.salesforce.api.dto.bulk.ContentType;
+import org.apache.camel.component.salesforce.api.dto.bulk.JobInfo;
+import org.apache.camel.component.salesforce.api.dto.bulk.JobStateEnum;
+import org.apache.camel.component.salesforce.api.dto.bulk.OperationEnum;
+import org.apache.camel.component.salesforce.dto.Merchandise__c;
+import org.junit.experimental.theories.DataPoints;
+import org.junit.experimental.theories.Theory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class BulkApiJobIntegrationTest extends AbstractBulkApiTestBase {
+
+    // test jobs for testJobLifecycle
+    @DataPoints
+    public static JobInfo[] getJobs() {
+        JobInfo jobInfo = new JobInfo();
+
+        // insert XML
+        jobInfo.setObject(Merchandise__c.class.getSimpleName());
+        jobInfo.setContentType(ContentType.XML);
+        jobInfo.setOperation(OperationEnum.INSERT);
+
+        List<JobInfo> result = new ArrayList<JobInfo>();
+        result.add(jobInfo);
+
+        // insert CSV
+        jobInfo = new JobInfo();
+        jobInfo.setObject(Merchandise__c.class.getSimpleName());
+        jobInfo.setContentType(ContentType.CSV);
+        jobInfo.setOperation(OperationEnum.INSERT);
+        result.add(jobInfo);
+
+        // update CSV
+        jobInfo = new JobInfo();
+        jobInfo.setObject(Merchandise__c.class.getSimpleName());
+        jobInfo.setContentType(ContentType.CSV);
+        jobInfo.setOperation(OperationEnum.UPDATE);
+        result.add(jobInfo);
+
+        // upsert CSV
+        jobInfo = new JobInfo();
+        jobInfo.setObject(Merchandise__c.class.getSimpleName());
+        jobInfo.setContentType(ContentType.CSV);
+        jobInfo.setOperation(OperationEnum.UPSERT);
+        jobInfo.setExternalIdFieldName("Name");
+        result.add(jobInfo);
+
+        // delete CSV
+        jobInfo = new JobInfo();
+        jobInfo.setObject(Merchandise__c.class.getSimpleName());
+        jobInfo.setContentType(ContentType.CSV);
+        jobInfo.setOperation(OperationEnum.DELETE);
+        result.add(jobInfo);
+
+        // hard delete CSV
+        jobInfo = new JobInfo();
+        jobInfo.setObject(Merchandise__c.class.getSimpleName());
+        jobInfo.setContentType(ContentType.CSV);
+        jobInfo.setOperation(OperationEnum.HARD_DELETE);
+        result.add(jobInfo);
+
+        // query CSV
+        jobInfo = new JobInfo();
+        jobInfo.setObject(Merchandise__c.class.getSimpleName());
+        jobInfo.setContentType(ContentType.CSV);
+        jobInfo.setOperation(OperationEnum.QUERY);
+        result.add(jobInfo);
+
+        return result.toArray(new JobInfo[result.size()]);
+    }
+
+    @Theory
+    public void testJobLifecycle(JobInfo jobInfo) throws Exception {
+        log.info("Testing Job lifecycle for {} of type {}", jobInfo.getOperation(), jobInfo.getContentType());
+
+        // test create
+        jobInfo = createJob(jobInfo);
+
+        // test get
+        jobInfo = template().requestBody("direct:getJob", jobInfo, JobInfo.class);
+        assertSame("Job should be OPEN", JobStateEnum.OPEN, jobInfo.getState());
+
+        // test close
+        jobInfo = template().requestBody("direct:closeJob", jobInfo, JobInfo.class);
+        assertSame("Job should be CLOSED", JobStateEnum.CLOSED, jobInfo.getState());
+
+        // test abort
+        jobInfo = template().requestBody("direct:abortJob", jobInfo, JobInfo.class);
+        assertSame("Job should be ABORTED", JobStateEnum.ABORTED, jobInfo.getState());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/BulkApiQueryIntegrationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/BulkApiQueryIntegrationTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/BulkApiQueryIntegrationTest.java
new file mode 100644
index 0000000..0f22ee1
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/BulkApiQueryIntegrationTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce;
+
+import org.apache.camel.component.salesforce.api.dto.bulk.*;
+import org.apache.camel.component.salesforce.dto.Merchandise__c;
+import org.junit.experimental.theories.DataPoints;
+import org.junit.experimental.theories.Theory;
+
+import java.io.InputStream;
+import java.util.List;
+
+public class BulkApiQueryIntegrationTest extends AbstractBulkApiTestBase {
+
+    @DataPoints
+    public static ContentType[] getContentTypes() {
+        return new ContentType[] {
+            ContentType.XML,
+            ContentType.CSV
+        };
+    }
+
+    @Theory
+    public void testQueryLifecycle(ContentType contentType) throws Exception {
+        log.info("Testing Query lifecycle with {} content", contentType);
+
+        // create a QUERY test Job
+        JobInfo jobInfo = new JobInfo();
+        jobInfo.setOperation(OperationEnum.QUERY);
+        jobInfo.setContentType(contentType);
+        jobInfo.setObject(Merchandise__c.class.getSimpleName());
+        jobInfo = createJob(jobInfo);
+
+        // test createQuery
+        BatchInfo batchInfo = template().requestBody("direct:createBatchQuery", jobInfo, BatchInfo.class);
+        assertNotNull("Null batch query", batchInfo);
+        assertNotNull("Null batch query id", batchInfo.getId());
+
+        // test getRequest
+        InputStream requestStream = template().requestBody("direct:getRequest", batchInfo, InputStream.class);
+        assertNotNull("Null batch request", requestStream);
+
+        // wait for batch to finish
+        log.info("Waiting for query batch to finish...");
+        while (!batchProcessed(batchInfo)) {
+            // sleep 5 seconds
+            Thread.sleep(5000);
+            // check again
+            batchInfo = getBatchInfo(batchInfo);
+        }
+        log.info("Query finished with state " + batchInfo.getState());
+        assertEquals("Query did not succeed", BatchStateEnum.COMPLETED, batchInfo.getState());
+
+        // test getQueryResultList
+        @SuppressWarnings("unchecked")
+        List<String> resultIds = template().requestBody("direct:getQueryResultIds", batchInfo, List.class);
+        assertNotNull("Null query result ids", resultIds);
+        assertFalse("Empty result ids", resultIds.isEmpty());
+
+        // test getQueryResult
+        for (String resultId : resultIds) {
+            InputStream results = template().requestBodyAndHeader("direct:getQueryResult", batchInfo,
+                SalesforceEndpointConfig.RESULT_ID, resultId, InputStream.class);
+            assertNotNull("Null query result", results);
+        }
+
+        // close the test job
+        template().requestBody("direct:closeJob", jobInfo, JobInfo.class);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/LoginConfigHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/LoginConfigHelper.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/LoginConfigHelper.java
new file mode 100644
index 0000000..1f8b480
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/LoginConfigHelper.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce;
+
+import org.junit.Assert;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+public class LoginConfigHelper extends Assert {
+
+    private static final String TEST_LOGIN_PROPERTIES = "test-salesforce-login.properties";
+
+    public static SalesforceLoginConfig getLoginConfig() throws IllegalAccessException, IOException {
+
+        // load test-salesforce-login properties
+        Properties properties = new Properties();
+        InputStream stream = new FileInputStream(TEST_LOGIN_PROPERTIES);
+        if (null == stream) {
+            throw new IllegalArgumentException("Create a properties file named " +
+                TEST_LOGIN_PROPERTIES + " with clientId, clientSecret, userName, and password" +
+                " for a Salesforce account with the Merchandise object from Salesforce Guides.");
+        }
+        properties.load(stream);
+
+        final SalesforceLoginConfig config = new SalesforceLoginConfig(
+            properties.getProperty("loginUrl", SalesforceLoginConfig.DEFAULT_LOGIN_URL),
+            properties.getProperty("clientId"),
+            properties.getProperty("clientSecret"),
+            properties.getProperty("userName"),
+            properties.getProperty("password"),
+            Boolean.parseBoolean(properties.getProperty("lazyLogin", "false")));
+
+        assertNotNull("Null loginUrl", config.getLoginUrl());
+        assertNotNull("Null clientId", config.getClientId());
+        assertNotNull("Null clientSecret", config.getClientSecret());
+        assertNotNull("Null userName", config.getUserName());
+        assertNotNull("Null password", config.getPassword());
+
+        return config;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/RestApiIntegrationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/RestApiIntegrationTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/RestApiIntegrationTest.java
new file mode 100644
index 0000000..a33d17d
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/RestApiIntegrationTest.java
@@ -0,0 +1,408 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.salesforce.api.dto.*;
+import org.apache.camel.component.salesforce.dto.Document;
+import org.apache.camel.component.salesforce.dto.Line_Item__c;
+import org.apache.camel.component.salesforce.dto.Merchandise__c;
+import org.apache.camel.component.salesforce.dto.QueryRecordsLine_Item__c;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
+import java.nio.channels.ReadableByteChannel;
+import java.util.HashMap;
+import java.util.List;
+
+public class RestApiIntegrationTest extends AbstractSalesforceTestBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RestApiIntegrationTest.class);
+    private static final String TEST_LINE_ITEM_ID = "1";
+    private static final String NEW_LINE_ITEM_ID = "100";
+    private static final String TEST_DOCUMENT_ID = "Test Document";
+
+    private static String testId;
+
+    @Test
+    public void testGetVersions() throws Exception {
+        doTestGetVersions("");
+        doTestGetVersions("Xml");
+    }
+
+    @SuppressWarnings("unchecked")
+    private void doTestGetVersions(String suffix) throws Exception {
+        // test getVersions doesn't need a body
+        // assert expected result
+        Object o = template().requestBody("direct:getVersions" + suffix, (Object) null);
+        List<Version> versions = null;
+        if (o instanceof Versions) {
+            versions = ((Versions) o).getVersions();
+        } else {
+            versions = (List<Version>) o;
+        }
+        assertNotNull(versions);
+        LOG.debug("Versions: {}", versions);
+    }
+
+    @Test
+    public void testGetResources() throws Exception {
+        doTestGetResources("");
+        doTestGetResources("Xml");
+    }
+
+    private void doTestGetResources(String suffix) throws Exception {
+
+
+        RestResources resources = template().requestBody("direct:getResources" + suffix, null, RestResources.class);
+        assertNotNull(resources);
+        LOG.debug("Resources: {}", resources);
+    }
+
+    @Test
+    public void testGetGlobalObjects() throws Exception {
+        doTestGetGlobalObjects("");
+        doTestGetGlobalObjects("Xml");
+    }
+
+    private void doTestGetGlobalObjects(String suffix) throws Exception {
+
+
+        GlobalObjects globalObjects = template().requestBody("direct:getGlobalObjects" + suffix, null, GlobalObjects.class);
+        assertNotNull(globalObjects);
+        LOG.debug("GlobalObjects: {}", globalObjects);
+    }
+
+    @Test
+    public void testGetBasicInfo() throws Exception {
+        doTestGetBasicInfo("");
+        doTestGetBasicInfo("Xml");
+    }
+
+    private void doTestGetBasicInfo(String suffix) throws Exception {
+        SObjectBasicInfo objectBasicInfo = template().requestBody("direct:getBasicInfo" + suffix, null, SObjectBasicInfo.class);
+        assertNotNull(objectBasicInfo);
+        LOG.debug("SObjectBasicInfo: {}", objectBasicInfo);
+
+        // set test Id for testGetSObject
+        assertFalse("RecentItems is empty", objectBasicInfo.getRecentItems().isEmpty());
+        testId = objectBasicInfo.getRecentItems().get(0).getId();
+    }
+
+    @Test
+    public void testGetDescription() throws Exception {
+        doTestGetDescription("");
+        doTestGetDescription("Xml");
+    }
+
+    private void doTestGetDescription(String suffix) throws Exception {
+
+
+        SObjectDescription sObjectDescription = template().requestBody("direct:getDescription" + suffix, null, SObjectDescription.class);
+        assertNotNull(sObjectDescription);
+        LOG.debug("SObjectDescription: {}", sObjectDescription);
+    }
+
+    @Test
+    public void testGetSObject() throws Exception {
+        doTestGetSObject("");
+        doTestGetSObject("Xml");
+    }
+
+    private void doTestGetSObject(String suffix) throws Exception {
+        if (testId == null) {
+            // execute getBasicInfo to get test id from recent items
+            doTestGetBasicInfo("");
+        }
+
+        Merchandise__c merchandise = template().requestBody("direct:getSObject" + suffix, testId, Merchandise__c.class);
+        assertNotNull(merchandise);
+        if (suffix.isEmpty()) {
+            assertNull(merchandise.getTotal_Inventory__c());
+            assertNotNull(merchandise.getPrice__c());
+        } else {
+            assertNotNull(merchandise.getTotal_Inventory__c());
+            assertNull(merchandise.getPrice__c());
+        }
+        LOG.debug("SObjectById: {}", merchandise);
+    }
+
+    @Test
+    public void testCreateUpdateDelete() throws Exception {
+        doTestCreateUpdateDelete("");
+        doTestCreateUpdateDelete("Xml");
+    }
+
+    private void doTestCreateUpdateDelete(String suffix) throws InterruptedException {
+        Merchandise__c merchandise__c = new Merchandise__c();
+        merchandise__c.setName("Wee Wee Wee Plane");
+        merchandise__c.setDescription__c("Microlite plane");
+        merchandise__c.setPrice__c(2000.0);
+        merchandise__c.setTotal_Inventory__c(50.0);
+        CreateSObjectResult result = template().requestBody("direct:CreateSObject" + suffix,
+            merchandise__c, CreateSObjectResult.class);
+        assertNotNull(result);
+        assertTrue("Create success", result.getSuccess());
+        LOG.debug("Create: " + result);
+
+        // test JSON update
+        // make the plane cheaper
+        merchandise__c.setPrice__c(1500.0);
+        // change inventory to half
+        merchandise__c.setTotal_Inventory__c(25.0);
+        // also need to set the Id
+        merchandise__c.setId(result.getId());
+
+        assertNull(template().requestBodyAndHeader("direct:UpdateSObject" + suffix,
+            merchandise__c, SalesforceEndpointConfig.SOBJECT_ID, result.getId()));
+        LOG.debug("Update successful");
+
+        // delete the newly created SObject
+        assertNull(template().requestBody("direct:deleteSObject" + suffix, result.getId()));
+        LOG.debug("Delete successful");
+    }
+
+    @Test
+    public void testCreateUpdateDeleteWithId() throws Exception {
+        doTestCreateUpdateDeleteWithId("");
+        doTestCreateUpdateDeleteWithId("Xml");
+    }
+
+    private void doTestCreateUpdateDeleteWithId(String suffix) throws InterruptedException {
+        // get line item with Name 1
+        Line_Item__c line_item__c = template().requestBody("direct:getSObjectWithId" + suffix, TEST_LINE_ITEM_ID,
+            Line_Item__c.class);
+        assertNotNull(line_item__c);
+        LOG.debug("GetWithId: {}", line_item__c);
+
+        // test insert with id
+        // set the unit price and sold
+        line_item__c.setUnit_Price__c(1000.0);
+        line_item__c.setUnits_Sold__c(50.0);
+        // update line item with Name NEW_LINE_ITEM_ID
+        line_item__c.setName(NEW_LINE_ITEM_ID);
+
+        CreateSObjectResult result = template().requestBodyAndHeader("direct:upsertSObject" + suffix,
+            line_item__c, SalesforceEndpointConfig.SOBJECT_EXT_ID_VALUE, NEW_LINE_ITEM_ID,
+            CreateSObjectResult.class);
+        assertNotNull(result);
+        assertTrue(result.getSuccess());
+        LOG.debug("CreateWithId: {}", result);
+
+        // clear read only parent type fields
+        line_item__c.setInvoice_Statement__c(null);
+        line_item__c.setMerchandise__c(null);
+        // change the units sold
+        line_item__c.setUnits_Sold__c(25.0);
+
+        // update line item with Name NEW_LINE_ITEM_ID
+        result = template().requestBodyAndHeader("direct:upsertSObject" + suffix,
+            line_item__c, SalesforceEndpointConfig.SOBJECT_EXT_ID_VALUE, NEW_LINE_ITEM_ID,
+            CreateSObjectResult.class);
+        assertNull(result);
+        LOG.debug("UpdateWithId: {}", result);
+
+        // delete the SObject with Name NEW_LINE_ITEM_ID
+        assertNull(template().requestBody("direct:deleteSObjectWithId" + suffix, NEW_LINE_ITEM_ID));
+        LOG.debug("DeleteWithId successful");
+    }
+
+    @Test
+    public void testGetBlobField() throws Exception {
+        doTestGetBlobField("");
+        doTestGetBlobField("Xml");
+    }
+
+    public void doTestGetBlobField(String suffix) throws Exception {
+        // get document with Name "Test Document"
+        final HashMap<String, Object> headers = new HashMap<String, Object>();
+        headers.put(SalesforceEndpointConfig.SOBJECT_NAME, "Document");
+        headers.put(SalesforceEndpointConfig.SOBJECT_EXT_ID_NAME, "Name");
+        Document document = template().requestBodyAndHeaders("direct:getSObjectWithId" + suffix, TEST_DOCUMENT_ID,
+            headers, Document.class);
+        assertNotNull(document);
+        LOG.debug("GetWithId: {}", document);
+
+        // get Body field for this document
+        InputStream body = template().requestBody("direct:getBlobField" + suffix, document, InputStream.class);
+        assertNotNull(body);
+        LOG.debug("GetBlobField: {}", body);
+        // write body to test file
+        final FileChannel fileChannel = new FileOutputStream("target/getBlobField" + suffix + ".txt").getChannel();
+        final ReadableByteChannel src = Channels.newChannel(body);
+        fileChannel.transferFrom(src, 0, document.getBodyLength());
+        fileChannel.close();
+        src.close();
+    }
+
+    @Test
+    public void testQuery() throws Exception {
+        doTestQuery("");
+        doTestQuery("Xml");
+    }
+
+    private void doTestQuery(String suffix) throws InterruptedException {
+        QueryRecordsLine_Item__c queryRecords = template().requestBody("direct:query" + suffix, null,
+            QueryRecordsLine_Item__c.class);
+        assertNotNull(queryRecords);
+        LOG.debug("ExecuteQuery: {}", queryRecords);
+    }
+
+
+    @Test
+    public void testSearch() throws Exception {
+        doTestSearch("");
+        doTestSearch("Xml");
+    }
+
+    @SuppressWarnings("unchecked")
+    private void doTestSearch(String suffix) throws InterruptedException {
+
+        Object obj = template().requestBody("direct:search" + suffix, (Object) null);
+        List<SearchResult> searchResults = null;
+        if (obj instanceof SearchResults) {
+            SearchResults results = (SearchResults) obj;
+            searchResults = results.getResults();
+        } else {
+            searchResults = (List<SearchResult>) obj;
+        }
+        assertNotNull(searchResults);
+        LOG.debug("ExecuteSearch: {}", searchResults);
+    }
+
+    @Override
+    protected RouteBuilder doCreateRouteBuilder() throws Exception {
+
+        // create test route
+        return new RouteBuilder() {
+            public void configure() {
+
+                // testGetVersion
+                from("direct:getVersions")
+                    .to("salesforce:getVersions");
+
+                // allow overriding format per endpoint
+                from("direct:getVersionsXml")
+                    .to("salesforce:getVersions?format=xml");
+
+                // testGetResources
+                from("direct:getResources")
+                    .to("salesforce:getResources");
+
+                from("direct:getResourcesXml")
+                    .to("salesforce:getResources?format=xml");
+
+                // testGetGlobalObjects
+                from("direct:getGlobalObjects")
+                    .to("salesforce:getGlobalObjects");
+
+                from("direct:getGlobalObjectsXml")
+                    .to("salesforce:getGlobalObjects?format=xml");
+
+                // testGetBasicInfo
+                from("direct:getBasicInfo")
+                    .to("salesforce:getBasicInfo?sObjectName=Merchandise__c");
+
+                from("direct:getBasicInfoXml")
+                    .to("salesforce:getBasicInfo?format=xml&sObjectName=Merchandise__c");
+
+                // testGetDescription
+                from("direct:getDescription")
+                    .to("salesforce:getDescription?sObjectName=Merchandise__c");
+
+                from("direct:getDescriptionXml")
+                    .to("salesforce:getDescription?format=xml&sObjectName=Merchandise__c");
+
+                // testGetSObject
+                from("direct:getSObject")
+                    .to("salesforce:getSObject?sObjectName=Merchandise__c&sObjectFields=Description__c,Price__c");
+
+                from("direct:getSObjectXml")
+                    .to("salesforce:getSObject?format=xml&sObjectName=Merchandise__c&sObjectFields=Description__c,Total_Inventory__c");
+
+                // testCreateSObject
+                from("direct:CreateSObject")
+                    .to("salesforce:createSObject?sObjectName=Merchandise__c");
+
+                from("direct:CreateSObjectXml")
+                    .to("salesforce:createSObject?format=xml&sObjectName=Merchandise__c");
+
+                // testUpdateSObject
+                from("direct:UpdateSObject")
+                    .to("salesforce:updateSObject?sObjectName=Merchandise__c");
+
+                from("direct:UpdateSObjectXml")
+                    .to("salesforce:updateSObject?format=xml&sObjectName=Merchandise__c");
+
+                // testDeleteSObject
+                from("direct:deleteSObject")
+                    .to("salesforce:deleteSObject?sObjectName=Merchandise__c");
+
+                from("direct:deleteSObjectXml")
+                    .to("salesforce:deleteSObject?format=xml&sObjectName=Merchandise__c");
+
+                // testGetSObjectWithId
+                from("direct:getSObjectWithId")
+                    .to("salesforce:getSObjectWithId?sObjectName=Line_Item__c&sObjectIdName=Name");
+
+                from("direct:getSObjectWithIdXml")
+                    .to("salesforce:getSObjectWithId?format=xml&sObjectName=Line_Item__c&sObjectIdName=Name");
+
+                // testUpsertSObject
+                from("direct:upsertSObject")
+                    .to("salesforce:upsertSObject?sObjectName=Line_Item__c&sObjectIdName=Name");
+
+                from("direct:upsertSObjectXml")
+                    .to("salesforce:upsertSObject?format=xml&sObjectName=Line_Item__c&sObjectIdName=Name");
+
+                // testDeleteSObjectWithId
+                from("direct:deleteSObjectWithId")
+                    .to("salesforce:deleteSObjectWithId?sObjectName=Line_Item__c&sObjectIdName=Name");
+
+                from("direct:deleteSObjectWithIdXml")
+                    .to("salesforce:deleteSObjectWithId?format=xml&sObjectName=Line_Item__c&sObjectIdName=Name");
+
+                // testGetBlobField
+                from("direct:getBlobField")
+                    .to("salesforce:getBlobField?sObjectName=Document&sObjectBlobFieldName=Body");
+
+                from("direct:getBlobFieldXml")
+                    .to("salesforce:getBlobField?format=xml&sObjectName=Document&sObjectBlobFieldName=Body");
+
+                // testQuery
+                from("direct:query")
+                    .to("salesforce:query?sObjectQuery=SELECT name from Line_Item__c&sObjectClass=org.apache.camel.component.salesforce.dto.QueryRecordsLine_Item__c");
+
+                from("direct:queryXml")
+                    .to("salesforce:query?format=xml&sObjectQuery=SELECT name from Line_Item__c&sObjectClass=org.apache.camel.component.salesforce.dto.QueryRecordsLine_Item__c");
+
+                // testSearch
+                from("direct:search")
+                    .to("salesforce:search?sObjectSearch=FIND {Wee}");
+
+                from("direct:searchXml")
+                    .to("salesforce:search?format=xml&sObjectSearch=FIND {Wee}");
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/StreamingApiIntegrationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/StreamingApiIntegrationTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/StreamingApiIntegrationTest.java
new file mode 100644
index 0000000..d72ca2b
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/StreamingApiIntegrationTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce;
+
+import org.apache.camel.Message;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.salesforce.api.dto.CreateSObjectResult;
+import org.apache.camel.component.salesforce.dto.Merchandise__c;
+import org.apache.camel.component.salesforce.internal.dto.QueryRecordsPushTopic;
+import org.joda.time.DateTime;
+import org.junit.Test;
+
+public class StreamingApiIntegrationTest extends AbstractSalesforceTestBase {
+
+    @Test
+    public void testSubscribeAndReceive() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:CamelTestTopic");
+        mock.expectedMessageCount(1);
+        // assert expected static headers
+        mock.expectedHeaderReceived("CamelSalesforceTopicName", "CamelTestTopic");
+        mock.expectedHeaderReceived("CamelSalesforceChannel", "/topic/CamelTestTopic");
+
+        Merchandise__c merchandise = new Merchandise__c();
+        merchandise.setName("TestNotification");
+        merchandise.setDescription__c("Merchandise for testing Streaming API updated on " +
+            new DateTime().toString());
+        merchandise.setPrice__c(9.99);
+        merchandise.setTotal_Inventory__c(1000.0);
+        CreateSObjectResult result = template().requestBody(
+            "direct:upsertSObject", merchandise, CreateSObjectResult.class);
+        assertTrue("Merchandise test record not created",  result == null || result.getSuccess());
+
+        try {
+            // wait for Salesforce notification
+            mock.assertIsSatisfied();
+            final Message in = mock.getExchanges().get(0).getIn();
+            merchandise = in.getMandatoryBody(Merchandise__c.class);
+            assertNotNull("Missing event body", merchandise);
+            log.info("Merchandise notification: {}", merchandise.toString());
+            assertNotNull("Missing field Id", merchandise.getId());
+            assertNotNull("Missing field Name", merchandise.getName());
+
+            // validate dynamic message headers
+            assertNotNull("Missing header CamelSalesforceClientId", in.getHeader("CamelSalesforceClientId"));
+
+        } finally {
+            // remove the test record
+            assertNull(template().requestBody("direct:deleteSObjectWithId", merchandise));
+
+            // remove the test topic
+            // find it using SOQL first
+            QueryRecordsPushTopic records = template().requestBody("direct:query", null,
+                QueryRecordsPushTopic.class);
+            assertEquals("Test topic not found", 1, records.getTotalSize());
+            assertNull(template().requestBody("direct:deleteSObject",
+                records.getRecords().get(0)));
+
+        }
+    }
+
+    @Override
+    protected RouteBuilder doCreateRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+
+                // test topic subscription
+                from("salesforce:CamelTestTopic?notifyForFields=ALL&notifyForOperations=ALL&" +
+//                    "sObjectClass=org.apache.camel.component.salesforce.dto.Merchandise__c&" +
+                    "sObjectName=Merchandise__c&" +
+                    "updateTopic=true&sObjectQuery=SELECT Id, Name FROM Merchandise__c").
+                    to("mock:CamelTestTopic");
+
+                // route for creating test record
+                from("direct:upsertSObject").
+                    to("salesforce:upsertSObject?SObjectIdName=Name");
+
+                // route for finding test topic
+                from("direct:query").
+                    to("salesforce:query?sObjectQuery=SELECT Id FROM PushTopic WHERE Name = 'CamelTestTopic'&" +
+                        "sObjectClass=org.apache.camel.component.salesforce.internal.dto.QueryRecordsPushTopic");
+
+                // route for removing test record
+                from("direct:deleteSObjectWithId").
+                    to("salesforce:deleteSObjectWithId?sObjectIdName=Name");
+
+                // route for removing topic
+                from("direct:deleteSObject").
+                    to("salesforce:deleteSObject");
+
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/dto/Document.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/dto/Document.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/dto/Document.java
new file mode 100644
index 0000000..090f2b1
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/dto/Document.java
@@ -0,0 +1,201 @@
+/*
+ * Salesforce DTO generated by camel-salesforce-maven-plugin
+ * Generated on: Tue May 14 21:15:54 PDT 2013
+ */
+package org.apache.camel.component.salesforce.dto;
+
+import com.thoughtworks.xstream.annotations.XStreamAlias;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.apache.camel.component.salesforce.api.dto.AbstractSObjectBase;
+
+/**
+ * Salesforce DTO for SObject Document
+ */
+@XStreamAlias("Document")
+public class Document extends AbstractSObjectBase {
+
+    // FolderId
+    private String FolderId;
+
+    @JsonProperty("FolderId")
+    public String getFolderId() {
+        return this.FolderId;
+    }
+
+    @JsonProperty("FolderId")
+    public void setFolderId(String FolderId) {
+        this.FolderId = FolderId;
+    }
+
+    // DeveloperName
+    private String DeveloperName;
+
+    @JsonProperty("DeveloperName")
+    public String getDeveloperName() {
+        return this.DeveloperName;
+    }
+
+    @JsonProperty("DeveloperName")
+    public void setDeveloperName(String DeveloperName) {
+        this.DeveloperName = DeveloperName;
+    }
+
+    // NamespacePrefix
+    private String NamespacePrefix;
+
+    @JsonProperty("NamespacePrefix")
+    public String getNamespacePrefix() {
+        return this.NamespacePrefix;
+    }
+
+    @JsonProperty("NamespacePrefix")
+    public void setNamespacePrefix(String NamespacePrefix) {
+        this.NamespacePrefix = NamespacePrefix;
+    }
+
+    // ContentType
+    private String ContentType;
+
+    @JsonProperty("ContentType")
+    public String getContentType() {
+        return this.ContentType;
+    }
+
+    @JsonProperty("ContentType")
+    public void setContentType(String ContentType) {
+        this.ContentType = ContentType;
+    }
+
+    // Type
+    private String Type;
+
+    @JsonProperty("Type")
+    public String getType() {
+        return this.Type;
+    }
+
+    @JsonProperty("Type")
+    public void setType(String Type) {
+        this.Type = Type;
+    }
+
+    // IsPublic
+    private Boolean IsPublic;
+
+    @JsonProperty("IsPublic")
+    public Boolean getIsPublic() {
+        return this.IsPublic;
+    }
+
+    @JsonProperty("IsPublic")
+    public void setIsPublic(Boolean IsPublic) {
+        this.IsPublic = IsPublic;
+    }
+
+    // BodyLength
+    private Integer BodyLength;
+
+    @JsonProperty("BodyLength")
+    public Integer getBodyLength() {
+        return this.BodyLength;
+    }
+
+    @JsonProperty("BodyLength")
+    public void setBodyLength(Integer BodyLength) {
+        this.BodyLength = BodyLength;
+    }
+
+    // Body
+    // blob field url, use getBlobField to get the content
+    @XStreamAlias("Body")
+    private String BodyUrl;
+
+    @JsonProperty("Body")
+    public String getBodyUrl() {
+        return this.BodyUrl;
+    }
+
+    @JsonProperty("Body")
+    public void setBodyUrl(String BodyUrl) {
+        this.BodyUrl = BodyUrl;
+    }
+
+    // Url
+    private String Url;
+
+    @JsonProperty("Url")
+    public String getUrl() {
+        return this.Url;
+    }
+
+    @JsonProperty("Url")
+    public void setUrl(String Url) {
+        this.Url = Url;
+    }
+
+    // Description
+    private String Description;
+
+    @JsonProperty("Description")
+    public String getDescription() {
+        return this.Description;
+    }
+
+    @JsonProperty("Description")
+    public void setDescription(String Description) {
+        this.Description = Description;
+    }
+
+    // Keywords
+    private String Keywords;
+
+    @JsonProperty("Keywords")
+    public String getKeywords() {
+        return this.Keywords;
+    }
+
+    @JsonProperty("Keywords")
+    public void setKeywords(String Keywords) {
+        this.Keywords = Keywords;
+    }
+
+    // IsInternalUseOnly
+    private Boolean IsInternalUseOnly;
+
+    @JsonProperty("IsInternalUseOnly")
+    public Boolean getIsInternalUseOnly() {
+        return this.IsInternalUseOnly;
+    }
+
+    @JsonProperty("IsInternalUseOnly")
+    public void setIsInternalUseOnly(Boolean IsInternalUseOnly) {
+        this.IsInternalUseOnly = IsInternalUseOnly;
+    }
+
+    // AuthorId
+    private String AuthorId;
+
+    @JsonProperty("AuthorId")
+    public String getAuthorId() {
+        return this.AuthorId;
+    }
+
+    @JsonProperty("AuthorId")
+    public void setAuthorId(String AuthorId) {
+        this.AuthorId = AuthorId;
+    }
+
+    // IsBodySearchable
+    private Boolean IsBodySearchable;
+
+    @JsonProperty("IsBodySearchable")
+    public Boolean getIsBodySearchable() {
+        return this.IsBodySearchable;
+    }
+
+    @JsonProperty("IsBodySearchable")
+    public void setIsBodySearchable(Boolean IsBodySearchable) {
+        this.IsBodySearchable = IsBodySearchable;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/0c401b9f/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/dto/Line_Item__c.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/dto/Line_Item__c.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/dto/Line_Item__c.java
new file mode 100644
index 0000000..1bf3668
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/dto/Line_Item__c.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.dto;
+
+import com.thoughtworks.xstream.annotations.XStreamAlias;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.apache.camel.component.salesforce.api.dto.AbstractSObjectBase;
+
+@XStreamAlias("Line_Item__c")
+public class Line_Item__c extends AbstractSObjectBase {
+
+    private Double Unit_Price__c;
+
+    private Double Units_Sold__c;
+
+    private String Merchandise__c;
+
+    private String Invoice_Statement__c;
+
+    @JsonProperty("Unit_Price__c")
+    public Double getUnit_Price__c() {
+        return Unit_Price__c;
+    }
+
+    @JsonProperty("Unit_Price__c")
+    public void setUnit_Price__c(Double unit_Price__c) {
+        Unit_Price__c = unit_Price__c;
+    }
+
+    @JsonProperty("Units_Sold__c")
+    public Double getUnits_Sold__c() {
+        return Units_Sold__c;
+    }
+
+    @JsonProperty("Units_Sold__c")
+    public void setUnits_Sold__c(Double units_Sold__c) {
+        Units_Sold__c = units_Sold__c;
+    }
+
+    @JsonProperty("Merchandise__c")
+    public String getMerchandise__c() {
+        return Merchandise__c;
+    }
+
+    @JsonProperty("Merchandise__c")
+    public void setMerchandise__c(String merchandise__c) {
+        Merchandise__c = merchandise__c;
+    }
+
+    @JsonProperty("Invoice_Statement__c")
+    public String getInvoice_Statement__c() {
+        return Invoice_Statement__c;
+    }
+
+    @JsonProperty("Invoice_Statement__c")
+    public void setInvoice_Statement__c(String invoice_Statement__c) {
+        Invoice_Statement__c = invoice_Statement__c;
+    }
+
+}