You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by zr...@apache.org on 2018/07/06 16:05:41 UTC

[camel] branch master updated: CAMEL-11803: Salesforce platform events

This is an automated email from the ASF dual-hosted git repository.

zregvart pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 3908657  CAMEL-11803: Salesforce platform events
3908657 is described below

commit 3908657d1340fa31e7cecccb6507f3212abf7b36
Author: Zoran Regvart <zr...@apache.org>
AuthorDate: Fri Jul 6 18:05:08 2018 +0200

    CAMEL-11803: Salesforce platform events
    
    This adds support for consuming Salesforce platform events via the
    CometD long polling mechanism used only for streaming PushTopic
    previously.
    
    For emitting platform events, preexisting `createSObject` operation can
    be used.
---
 .../camel-salesforce-component/pom.xml             |   6 +
 .../src/main/docs/salesforce-component.adoc        |  56 +++++
 .../component/salesforce/SalesforceConsumer.java   | 235 ++++++++++--------
 .../salesforce/api/dto/PlatformEvent.java          |  83 +++++++
 .../internal/streaming/SubscriptionHelper.java     |  21 +-
 .../PlatformEventsConsumerIntegrationTest.java     |  69 ++++++
 .../salesforce/SalesforceConsumerTest.java         | 266 +++++++++++++++++++++
 .../salesforce/api/dto/PlatformEventTest.java      |  48 ++++
 .../internal/streaming/SubscriptionHelperTest.java |   8 +
 .../salesforce/objects/TestEvent__e.object         |  35 +++
 .../it/resources/salesforce/package.xml            |   6 +-
 11 files changed, 731 insertions(+), 102 deletions(-)

diff --git a/components/camel-salesforce/camel-salesforce-component/pom.xml b/components/camel-salesforce/camel-salesforce-component/pom.xml
index b4e41bd..f9892c0 100644
--- a/components/camel-salesforce/camel-salesforce-component/pom.xml
+++ b/components/camel-salesforce/camel-salesforce-component/pom.xml
@@ -204,6 +204,12 @@
       <version>${okclient-version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+      <version>${awaitility-version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc b/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc
index a2276a9..9853e5c 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc
@@ -212,6 +212,62 @@ To subscribe to an existing topic
 from("salesforce:CamelTestTopic&sObjectName=Merchandise__c")...
 ----
 
+==== Platform events
+
+To emit a platform event use `createSObject` operation. And set the
+message body can be JSON string or InputStream with key-value data --
+in that case `sObjectName` needs to be set to the API name of the
+event, or a class that extends from AbstractDTOBase with the
+appropriate class name for the event.  
+
+For example using a DTO:
+
+[source,java]
+----
+class Order_Event__e extends AbstractDTOBase {
+  @JsonProperty("OrderNumber")
+  private String orderNumber;
+  // ... other properties and getters/setters
+}
+
+from("timer:tick")
+    .process(exchange -> {
+        final Message in = exchange.getIn();
+        String orderNumber = "ORD" + String.valueOf(in.getHeader(Exchange.TIMER_COUNTER));
+        Order_Event__e event = new Order_Event__e();
+        event.setOrderNumber(orderNumber);
+        in.setBody(event);
+    })
+    .to("salesforce:createSObject");
+----
+
+Or using JSON event data:
+
+[source,java]
+----
+from("timer:tick")
+    .process(exchange -> {
+        final Message in = exchange.getIn();
+        String orderNumber = "ORD" + String.valueOf(in.getHeader(Exchange.TIMER_COUNTER));
+        in.setBody("{\"OrderNumber\":\"" + orderNumber + "\"}");
+    })
+    .to("salesforce:createSObject?sObjectName=Order_Event__e");
+----
+
+To receive platform events use the consumer endpoint with the API name of
+the platform event prefixed with `event/` (or `/event/`), e.g.:
+`salesforce:events/Order_Event__e`. Processor consuming from that
+endpoint will receive either `org.apache.camel.component.salesforce.api.dto.PlatformEvent`
+object or `org.cometd.bayeux.Message` in the body depending on the
+`rawPayload` being `false` or `true` respectively.
+
+For example, in the simplest form to consume one event:
+
+[source,java]
+----
+PlatformEvent event = consumer.receiveBody("salesforce:event/Order_Event__e", PlatformEvent.class);
+----
+
 === Examples
 
 ==== Uploading a document to a ContentWorkspace
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
index 839ba8b..861edec 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
@@ -18,20 +18,20 @@ package org.apache.camel.component.salesforce;
 
 import java.io.IOException;
 import java.io.StringReader;
-import java.util.HashMap;
 import java.util.Map;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 
-import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.component.salesforce.api.dto.PlatformEvent;
 import org.apache.camel.component.salesforce.api.utils.JsonUtils;
 import org.apache.camel.component.salesforce.internal.client.RestClient;
 import org.apache.camel.component.salesforce.internal.streaming.PushTopicHelper;
 import org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper;
 import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
 import org.cometd.bayeux.Message;
 import org.cometd.bayeux.client.ClientSessionChannel;
@@ -41,41 +41,57 @@ import org.cometd.bayeux.client.ClientSessionChannel;
  */
 public class SalesforceConsumer extends DefaultConsumer {
 
+    private enum MessageKind {
+        PLATFORM_EVENT, PUSH_TOPIC;
+
+        public static MessageKind fromTopicName(final String topicName) {
+            if (topicName.startsWith("event/") || topicName.startsWith("/event/")) {
+                return MessageKind.PLATFORM_EVENT;
+            }
+
+            return PUSH_TOPIC;
+        }
+    }
 
-    private static final ObjectMapper OBJECT_MAPPER = JsonUtils.createObjectMapper();
-    private static final String EVENT_PROPERTY = "event";
-    private static final String TYPE_PROPERTY = "type";
     private static final String CREATED_DATE_PROPERTY = "createdDate";
-    private static final String SOBJECT_PROPERTY = "sobject";
-    private static final String REPLAY_ID_PROPERTY = "replayId";
+    private static final String EVENT_PROPERTY = "event";
     private static final double MINIMUM_VERSION = 24.0;
+    private static final ObjectMapper OBJECT_MAPPER = JsonUtils.createObjectMapper();
+    private static final String REPLAY_ID_PROPERTY = "replayId";
+    private static final String SOBJECT_PROPERTY = "sobject";
+    private static final String TYPE_PROPERTY = "type";
 
     private final SalesforceEndpoint endpoint;
-    private final SubscriptionHelper subscriptionHelper;
+    private final MessageKind messageKind;
     private final ObjectMapper objectMapper;
 
-    private final String topicName;
+    private final boolean rawPayload;
     private final Class<?> sObjectClass;
     private boolean subscribed;
+    private final SubscriptionHelper subscriptionHelper;
+    private final String topicName;
 
-
-    public SalesforceConsumer(SalesforceEndpoint endpoint, Processor processor, SubscriptionHelper helper) {
+    public SalesforceConsumer(final SalesforceEndpoint endpoint, final Processor processor, final SubscriptionHelper helper) {
         super(endpoint, processor);
         this.endpoint = endpoint;
-        ObjectMapper configuredObjectMapper = endpoint.getConfiguration().getObjectMapper();
+        final ObjectMapper configuredObjectMapper = endpoint.getConfiguration().getObjectMapper();
         if (configuredObjectMapper != null) {
-            this.objectMapper = configuredObjectMapper;
+            objectMapper = configuredObjectMapper;
         } else {
-            this.objectMapper = OBJECT_MAPPER;
+            objectMapper = OBJECT_MAPPER;
         }
 
         // check minimum supported API version
-        if (Double.valueOf(endpoint.getConfiguration().getApiVersion()) < MINIMUM_VERSION) {
+        if (Double.parseDouble(endpoint.getConfiguration().getApiVersion()) < MINIMUM_VERSION) {
             throw new IllegalArgumentException("Minimum supported API version for consumer endpoints is " + 24.0);
         }
 
-        this.topicName = endpoint.getTopicName();
-        this.subscriptionHelper = helper;
+        topicName = endpoint.getTopicName();
+        subscriptionHelper = helper;
+
+        messageKind = MessageKind.fromTopicName(topicName);
+
+        rawPayload = endpoint.getConfiguration().getRawPayload();
 
         // get sObjectClass to convert to
         final String sObjectName = endpoint.getConfiguration().getSObjectName();
@@ -99,66 +115,87 @@ public class SalesforceConsumer extends DefaultConsumer {
 
     }
 
-    @Override
-    protected void doStart() throws Exception {
-        super.doStart();
+    public String getTopicName() {
+        return topicName;
+    }
 
-        final SalesforceEndpointConfig config = endpoint.getConfiguration();
+    @Override
+    public void handleException(String message, Throwable t) {
+        super.handleException(message, t);
+    }
 
-        // is a query configured in the endpoint?
-        if (config.getSObjectQuery() != null) {
-            // Note that we don't lookup topic if the query is not specified
-            // create REST client for PushTopic operations
-            final SalesforceComponent salesforceComponent = endpoint.getComponent();
-            final RestClient restClient = salesforceComponent.createRestClientFor(endpoint);
+    public void processMessage(final ClientSessionChannel channel, final Message message) {
+        if (log.isDebugEnabled()) {
+            log.debug("Received event {} on channel {}", channel.getId(), channel.getChannelId());
+        }
 
-            // don't forget to start the client
-            ServiceHelper.startService(restClient);
+        final Exchange exchange = endpoint.createExchange();
+        final org.apache.camel.Message in = exchange.getIn();
+
+        switch (messageKind) {
+        case PUSH_TOPIC:
+            createPushTopicMessage(message, in);
+            break;
+        case PLATFORM_EVENT:
+            createPlatformEventMessage(message, in);
+            break;
+        }
 
-            try {
-                PushTopicHelper helper = new PushTopicHelper(config, topicName, restClient);
-                helper.createOrUpdateTopic();
-            } finally {
-                // don't forget to stop the client
-                ServiceHelper.stopService(restClient);
+        try {
+            getAsyncProcessor().process(exchange);
+        } catch (final Exception e) {
+            final String msg = String.format("Error processing %s: %s", exchange, e);
+            handleException(msg, new SalesforceException(msg, e));
+        } finally {
+            final Exception ex = exchange.getException();
+            if (ex != null) {
+                final String msg = String.format("Unhandled exception: %s", ex.getMessage());
+                handleException(msg, new SalesforceException(msg, ex));
             }
         }
-
-        // subscribe to topic
-        subscriptionHelper.subscribe(topicName, this);
-        subscribed = true;
     }
 
-    @Override
-    protected void doStop() throws Exception {
-        super.doStop();
+    void createPlatformEventMessage(final Message message, final org.apache.camel.Message in) {
+        setHeaders(in, message);
 
-        if (subscribed) {
-            subscribed = false;
-            // unsubscribe from topic
-            subscriptionHelper.unsubscribe(topicName, this);
+        final Map<String, Object> data = message.getDataAsMap();
+
+        @SuppressWarnings("unchecked")
+        final Map<String, Object> event = (Map<String, Object>) data.get("event");
+
+        final Object replayId = event.get(REPLAY_ID_PROPERTY);
+        if (replayId != null) {
+            in.setHeader("CamelSalesforceReplayId", replayId);
         }
+
+        in.setHeader("CamelSalesforcePlatformEventSchema", data.get("schema"));
+        in.setHeader("CamelSalesforceEventType", topicName.substring(topicName.lastIndexOf('/') + 1));
+
+        final Object payload = data.get("payload");
+
+        final PlatformEvent platformEvent = objectMapper.convertValue(payload, PlatformEvent.class);
+        in.setHeader("CamelSalesforceCreatedDate", platformEvent.getCreated());
+
+        if (rawPayload) {
+            in.setBody(message);
+        } else {
+            in.setBody(platformEvent);
+        }
+
     }
 
-    public void processMessage(ClientSessionChannel channel, Message message) {
-        final Exchange exchange = endpoint.createExchange();
-        org.apache.camel.Message in = exchange.getIn();
+    void createPushTopicMessage(final Message message, final org.apache.camel.Message in) {
         setHeaders(in, message);
 
-        // get event data
-        // TODO do we need to add NPE checks for message/data.get***???
-        Map<String, Object> data = message.getDataAsMap();
+        final Map<String, Object> data = message.getDataAsMap();
 
         @SuppressWarnings("unchecked")
         final Map<String, Object> event = (Map<String, Object>) data.get(EVENT_PROPERTY);
         final Object eventType = event.get(TYPE_PROPERTY);
-        Object createdDate = event.get(CREATED_DATE_PROPERTY);
-        Object replayId = event.get(REPLAY_ID_PROPERTY);
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Received event %s on channel %s created on %s",
-                    eventType, channel.getChannelId(), createdDate));
-        }
+        final Object createdDate = event.get(CREATED_DATE_PROPERTY);
+        final Object replayId = event.get(REPLAY_ID_PROPERTY);
 
+        in.setHeader("CamelSalesforceTopicName", topicName);
         in.setHeader("CamelSalesforceEventType", eventType);
         in.setHeader("CamelSalesforceCreatedDate", createdDate);
         if (replayId != null) {
@@ -173,7 +210,7 @@ public class SalesforceConsumer extends DefaultConsumer {
             final String sObjectString = objectMapper.writeValueAsString(sObject);
             log.debug("Received SObject: {}", sObjectString);
 
-            if (endpoint.getConfiguration().getRawPayload()) {
+            if (rawPayload) {
                 // return sobject string as exchange body
                 in.setBody(sObjectString);
             } else if (sObjectClass == null) {
@@ -181,55 +218,61 @@ public class SalesforceConsumer extends DefaultConsumer {
                 in.setBody(sObject);
             } else {
                 // create the expected SObject
-                in.setBody(objectMapper.readValue(
-                        new StringReader(sObjectString), sObjectClass));
+                in.setBody(objectMapper.readValue(new StringReader(sObjectString), sObjectClass));
             }
-        } catch (IOException e) {
-            final String msg = String.format("Error parsing message [%s] from Topic %s: %s",
-                    message, topicName, e.getMessage());
+        } catch (final IOException e) {
+            final String msg = String.format("Error parsing message [%s] from Topic %s: %s", message, topicName, e.getMessage());
             handleException(msg, new SalesforceException(msg, e));
         }
+    }
 
-        try {
-            getAsyncProcessor().process(exchange, new AsyncCallback() {
-                public void done(boolean doneSync) {
-                    // noop
-                    if (log.isTraceEnabled()) {
-                        log.trace("Done processing event: {} {}", eventType.toString(),
-                                doneSync ? "synchronously" : "asynchronously");
-                    }
-                }
-            });
-        } catch (Exception e) {
-            String msg = String.format("Error processing %s: %s", exchange, e);
-            handleException(msg, new SalesforceException(msg, e));
-        } finally {
-            Exception ex = exchange.getException();
-            if (ex != null) {
-                String msg = String.format("Unhandled exception: %s", ex.getMessage());
-                handleException(msg, new SalesforceException(msg, ex));
-            }
+    void setHeaders(final org.apache.camel.Message in, final Message message) {
+        in.setHeader("CamelSalesforceChannel", message.getChannel());
+        final String clientId = message.getClientId();
+        if (ObjectHelper.isNotEmpty(clientId)) {
+            in.setHeader("CamelSalesforceClientId", clientId);
         }
     }
 
-    private void setHeaders(org.apache.camel.Message in, Message message) {
-        Map<String, Object> headers = new HashMap<>();
-        // set topic name
-        headers.put("CamelSalesforceTopicName", topicName);
-        // set message properties as headers
-        headers.put("CamelSalesforceChannel", message.getChannel());
-        headers.put("CamelSalesforceClientId", message.getClientId());
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        final SalesforceEndpointConfig config = endpoint.getConfiguration();
 
-        in.setHeaders(headers);
+        // is a query configured in the endpoint?
+        if (messageKind == MessageKind.PUSH_TOPIC && ObjectHelper.isNotEmpty(config.getSObjectQuery())) {
+            // Note that we don't lookup topic if the query is not specified
+            // create REST client for PushTopic operations
+            final SalesforceComponent salesforceComponent = endpoint.getComponent();
+            final RestClient restClient = salesforceComponent.createRestClientFor(endpoint);
+
+            // don't forget to start the client
+            ServiceHelper.startService(restClient);
+
+            try {
+                final PushTopicHelper helper = new PushTopicHelper(config, topicName, restClient);
+                helper.createOrUpdateTopic();
+            } finally {
+                // don't forget to stop the client
+                ServiceHelper.stopService(restClient);
+            }
+        }
+
+        // subscribe to topic
+        subscriptionHelper.subscribe(topicName, this);
+        subscribed = true;
     }
 
     @Override
-    public void handleException(String message, Throwable t) {
-        super.handleException(message, t);
-    }
+    protected void doStop() throws Exception {
+        super.doStop();
 
-    public String getTopicName() {
-        return topicName;
+        if (subscribed) {
+            subscribed = false;
+            // unsubscribe from topic
+            subscriptionHelper.unsubscribe(topicName, this);
+        }
     }
 
 }
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/PlatformEvent.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/PlatformEvent.java
new file mode 100644
index 0000000..842d4f7
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/PlatformEvent.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.api.dto;
+
+import java.io.Serializable;
+import java.time.ZonedDateTime;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import com.fasterxml.jackson.annotation.JsonAnySetter;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public final class PlatformEvent implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final ZonedDateTime created;
+
+    private final String createdById;
+
+    private final Map<String, String> eventData = new HashMap<>();
+
+    @JsonCreator
+    public PlatformEvent(@JsonProperty("CreatedDate") final ZonedDateTime created, @JsonProperty("CreatedById") final String createdById) {
+        this.created = created;
+        this.createdById = createdById;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof PlatformEvent)) {
+            return false;
+        }
+
+        final PlatformEvent other = (PlatformEvent) obj;
+
+        return Objects.equals(created, other.created) && Objects.equals(createdById, other.createdById) && Objects.equals(eventData, other.eventData);
+    }
+
+    public ZonedDateTime getCreated() {
+        return created;
+    }
+
+    public String getCreatedById() {
+        return createdById;
+    }
+
+    public Map<String, String> getEventData() {
+        return eventData;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(created, createdById, eventData);
+    }
+
+    @JsonAnySetter
+    public void set(final String name, final String value) {
+        eventData.put(name, value);
+    }
+
+    @Override
+    public String toString() {
+        return new StringBuilder().append("PlatformEvent: createdById: ").append(createdById).append(", createdId: ").append(created).append(", data: ").append(eventData)
+            .toString();
+    }
+}
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index e3a84d1..c10aa92 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -459,8 +459,25 @@ public class SubscriptionHelper extends ServiceSupport {
             .filter(Objects::nonNull).findFirst();
     }
 
-    static String getChannelName(String topicName) {
-        return "/topic/" + topicName;
+    static String getChannelName(final String topicName) {
+        final StringBuilder channelName = new StringBuilder();
+        if (topicName.charAt(0) != '/') {
+            channelName.append('/');
+        }
+
+        if (topicName.indexOf('/', 1) > 0) {
+            channelName.append(topicName);
+        } else {
+            channelName.append("topic/");
+            channelName.append(topicName);
+        }
+
+        final int typeIdx = channelName.indexOf("/", 1);
+        if ("event".equals(channelName.substring(1, typeIdx)) && !topicName.endsWith("__e")) {
+            channelName.append("__e");
+        }
+
+        return channelName.toString();
     }
 
     public void unsubscribe(String topicName, SalesforceConsumer consumer) throws CamelException {
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PlatformEventsConsumerIntegrationTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PlatformEventsConsumerIntegrationTest.java
new file mode 100644
index 0000000..18d052b
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PlatformEventsConsumerIntegrationTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.salesforce.api.dto.PlatformEvent;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.entry;
+
+public class PlatformEventsConsumerIntegrationTest extends AbstractSalesforceTestBase {
+
+    @Test
+    public void shouldConsumePlatformEvents() throws InterruptedException, ExecutionException {
+        final ExecutorService parallel = Executors.newSingleThreadExecutor();
+
+        final Future<PlatformEvent> futurePlatformEvent = parallel.submit(() -> consumer.receiveBody("salesforce:event/TestEvent__e", PlatformEvent.class));
+
+        // it takes some time for the subscriber to subscribe, so we'll try to
+        // send repeated platform events and wait until the first one is
+        // received
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+            template.sendBody("direct:sendPlatformEvent", "{\"Test_Field__c\": \"data\"}");
+
+            Assertions.assertThat(futurePlatformEvent.isDone()).isTrue();
+        });
+
+        final PlatformEvent platformEvent = futurePlatformEvent.get();
+        Assertions.assertThat(platformEvent).isNotNull();
+        Assertions.assertThat(platformEvent.getEventData()).containsOnly(entry("Test_Field__c", "data"));
+    }
+
+    @Override
+    protected RouteBuilder doCreateRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:sendPlatformEvent").to("salesforce:createSObject?sObjectName=TestEvent__e");
+            }
+        };
+    }
+
+    @Override
+    protected String salesforceApiVersionToUse() {
+        return "41.0";
+    }
+}
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/SalesforceConsumerTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/SalesforceConsumerTest.java
new file mode 100644
index 0000000..80db986
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/SalesforceConsumerTest.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce;
+
+import java.time.ZonedDateTime;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.salesforce.api.dto.PlatformEvent;
+import org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper;
+import org.apache.camel.spi.ClassResolver;
+import org.cometd.bayeux.Message;
+import org.cometd.bayeux.client.ClientSessionChannel;
+import org.cometd.common.HashMapMessage;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+public class SalesforceConsumerTest {
+
+    public static class AccountUpdates {
+
+        @JsonProperty("Id")
+        String id;
+
+        @JsonProperty("Name")
+        String name;
+
+        @JsonProperty("Phone")
+        String phone;
+
+        @Override
+        public boolean equals(Object obj) {
+            if (!(obj instanceof AccountUpdates)) {
+                return false;
+            }
+
+            final AccountUpdates other = (AccountUpdates) obj;
+
+            return Objects.equals(id, other.id) && Objects.equals(name, other.name) && Objects.equals(phone, other.phone);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(id, name, phone);
+        }
+
+    }
+
+    static final SubscriptionHelper NOT_USED = null;
+
+    SalesforceEndpointConfig configuration = new SalesforceEndpointConfig();
+
+    SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class);
+
+    Exchange exchange = mock(Exchange.class);
+
+    org.apache.camel.Message in = mock(org.apache.camel.Message.class);
+
+    AsyncProcessor processor = mock(AsyncProcessor.class);
+
+    Message pushTopicMessage;
+
+    @Before
+    public void setupMocks() {
+        when(endpoint.getConfiguration()).thenReturn(configuration);
+        when(endpoint.createExchange()).thenReturn(exchange);
+        when(exchange.getIn()).thenReturn(in);
+        final SalesforceComponent component = mock(SalesforceComponent.class);
+        when(endpoint.getComponent()).thenReturn(component);
+        final CamelContext camelContext = mock(CamelContext.class);
+        when(component.getCamelContext()).thenReturn(camelContext);
+        final ClassResolver classResolver = mock(ClassResolver.class);
+        when(camelContext.getClassResolver()).thenReturn(classResolver);
+        when(classResolver.resolveClass(AccountUpdates.class.getName())).thenReturn((Class) AccountUpdates.class);
+
+        pushTopicMessage = createPushTopicMessage();
+    }
+
+    @Test
+    public void shouldProcessMappedPayloadPushTopicMessages() throws Exception {
+        when(endpoint.getTopicName()).thenReturn("AccountUpdates");
+        configuration.setSObjectClass(AccountUpdates.class.getName());
+
+        final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED);
+
+        consumer.processMessage(mock(ClientSessionChannel.class), pushTopicMessage);
+
+        final AccountUpdates accountUpdates = new AccountUpdates();
+        accountUpdates.phone = "(415) 555-1212";
+        accountUpdates.id = "001D000000KneakIAB";
+        accountUpdates.name = "Blackbeard";
+
+        verify(in).setBody(accountUpdates);
+        verify(in).setHeader("CamelSalesforceEventType", "created");
+        verify(in).setHeader("CamelSalesforceCreatedDate", "2016-09-16T19:45:27.454Z");
+        verify(in).setHeader("CamelSalesforceReplayId", 1L);
+        verify(in).setHeader("CamelSalesforceTopicName", "AccountUpdates");
+        verify(in).setHeader("CamelSalesforceChannel", "/topic/AccountUpdates");
+        verify(in).setHeader("CamelSalesforceClientId", "lxdl9o32njygi1gj47kgfaga4k");
+
+        verify(processor).process(same(exchange));
+    }
+
+    @Test
+    public void shouldProcessPlatformEvents() throws Exception {
+        when(endpoint.getTopicName()).thenReturn("/event/TestEvent__e");
+
+        final Message message = new HashMapMessage();
+        final Map<String, Object> data = new HashMap<>();
+        data.put("schema", "30H2pgzuWcF844p26Ityvg");
+
+        final Map<String, Object> payload = new HashMap<>();
+        payload.put("Test_Field__c", "abc");
+        payload.put("CreatedById", "00541000002WYFpAAO");
+        payload.put("CreatedDate", "2018-07-06T12:41:04Z");
+        data.put("payload", payload);
+        data.put("event", Collections.singletonMap("replayId", 4L));
+        message.put("data", data);
+        message.put("channel", "/event/TestEvent__e");
+
+        final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED);
+
+        consumer.processMessage(mock(ClientSessionChannel.class), message);
+
+        final ZonedDateTime created = ZonedDateTime.parse("2018-07-06T12:41:04Z");
+        final PlatformEvent event = new PlatformEvent(created, "00541000002WYFpAAO");
+        event.set("Test_Field__c", "abc");
+        verify(in).setBody(event);
+        verify(in).setHeader("CamelSalesforceCreatedDate", created);
+        verify(in).setHeader("CamelSalesforceReplayId", 4L);
+        verify(in).setHeader("CamelSalesforceChannel", "/event/TestEvent__e");
+        verify(in).setHeader("CamelSalesforceEventType", "TestEvent__e");
+        verify(in).setHeader("CamelSalesforcePlatformEventSchema", "30H2pgzuWcF844p26Ityvg");
+
+        verify(processor).process(same(exchange));
+
+        verifyNoMoreInteractions(in, processor);
+    }
+
+    @Test
+    public void shouldProcessPushTopicMessages() throws Exception {
+        when(endpoint.getTopicName()).thenReturn("AccountUpdates");
+
+        final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED);
+
+        consumer.processMessage(mock(ClientSessionChannel.class), pushTopicMessage);
+
+        @SuppressWarnings("unchecked")
+        final Object sobject = ((Map<String, Object>) pushTopicMessage.get("data")).get("sobject");
+        verify(in).setBody(sobject);
+        verify(in).setHeader("CamelSalesforceEventType", "created");
+        verify(in).setHeader("CamelSalesforceCreatedDate", "2016-09-16T19:45:27.454Z");
+        verify(in).setHeader("CamelSalesforceReplayId", 1L);
+        verify(in).setHeader("CamelSalesforceTopicName", "AccountUpdates");
+        verify(in).setHeader("CamelSalesforceChannel", "/topic/AccountUpdates");
+        verify(in).setHeader("CamelSalesforceClientId", "lxdl9o32njygi1gj47kgfaga4k");
+
+        verify(processor).process(same(exchange));
+    }
+
+    @Test
+    public void shouldProcessRawPayloadPushTopicMessages() throws Exception {
+        when(endpoint.getTopicName()).thenReturn("AccountUpdates");
+        configuration.setRawPayload(true);
+
+        final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED);
+
+        consumer.processMessage(mock(ClientSessionChannel.class), pushTopicMessage);
+
+        verify(in).setBody("{\"Phone\":\"(415) 555-1212\",\"Id\":\"001D000000KneakIAB\",\"Name\":\"Blackbeard\"}");
+        verify(in).setHeader("CamelSalesforceEventType", "created");
+        verify(in).setHeader("CamelSalesforceCreatedDate", "2016-09-16T19:45:27.454Z");
+        verify(in).setHeader("CamelSalesforceReplayId", 1L);
+        verify(in).setHeader("CamelSalesforceTopicName", "AccountUpdates");
+        verify(in).setHeader("CamelSalesforceChannel", "/topic/AccountUpdates");
+        verify(in).setHeader("CamelSalesforceClientId", "lxdl9o32njygi1gj47kgfaga4k");
+
+        verify(processor).process(same(exchange));
+    }
+
+    @Test
+    public void shouldProcessRawPlatformEvents() throws Exception {
+        when(endpoint.getTopicName()).thenReturn("/event/TestEvent__e");
+        configuration.setRawPayload(true);
+
+        final Message message = new HashMapMessage();
+        final Map<String, Object> data = new HashMap<>();
+        data.put("schema", "30H2pgzuWcF844p26Ityvg");
+
+        final Map<String, Object> payload = new HashMap<>();
+        payload.put("Test_Field__c", "abc");
+        payload.put("CreatedById", "00541000002WYFpAAO");
+        payload.put("CreatedDate", "2018-07-06T12:41:04Z");
+        data.put("payload", payload);
+        data.put("event", Collections.singletonMap("replayId", 4L));
+        message.put("data", data);
+        message.put("channel", "/event/TestEvent__e");
+
+        final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED);
+
+        consumer.processMessage(mock(ClientSessionChannel.class), message);
+
+        verify(in).setBody(message);
+        verify(in).setHeader("CamelSalesforceCreatedDate", ZonedDateTime.parse("2018-07-06T12:41:04Z"));
+        verify(in).setHeader("CamelSalesforceReplayId", 4L);
+        verify(in).setHeader("CamelSalesforceChannel", "/event/TestEvent__e");
+        verify(in).setHeader("CamelSalesforceEventType", "TestEvent__e");
+        verify(in).setHeader("CamelSalesforcePlatformEventSchema", "30H2pgzuWcF844p26Ityvg");
+
+        verify(processor).process(same(exchange));
+
+        verifyNoMoreInteractions(in, processor);
+    }
+
+    static Message createPushTopicMessage() {
+        final Message pushTopicMessage = new HashMapMessage();
+        pushTopicMessage.put("clientId", "lxdl9o32njygi1gj47kgfaga4k");
+
+        final Map<String, Object> data = new HashMap<>();
+        pushTopicMessage.put("data", data);
+
+        final Map<String, Object> event = new HashMap<>();
+        data.put("event", event);
+
+        event.put("createdDate", "2016-09-16T19:45:27.454Z");
+        event.put("replayId", 1L);
+        event.put("type", "created");
+
+        final Map<String, Object> sobject = new HashMap<>();
+        data.put("sobject", sobject);
+
+        sobject.put("Phone", "(415) 555-1212");
+        sobject.put("Id", "001D000000KneakIAB");
+        sobject.put("Name", "Blackbeard");
+
+        pushTopicMessage.put("channel", "/topic/AccountUpdates");
+        return pushTopicMessage;
+    }
+}
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/api/dto/PlatformEventTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/api/dto/PlatformEventTest.java
new file mode 100644
index 0000000..094b28d
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/api/dto/PlatformEventTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.api.dto;
+
+import java.io.IOException;
+import java.time.ZonedDateTime;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.camel.component.salesforce.api.utils.JsonUtils;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.entry;
+
+public class PlatformEventTest {
+
+    @Test
+    public void shouldDeserialize() throws IOException {
+        final ObjectMapper mapper = JsonUtils.createObjectMapper();
+
+        final PlatformEvent platformEvent = mapper.readValue("{\n" + //
+            "  \"CreatedDate\": \"2017-04-14T13:35:23Z\", \n" + //
+            "  \"CreatedById\": \"005B00000031mqb\", \n" + //
+            "  \"Order_Number__c\": \"10013\", \n" + //
+            "  \"Type__c\": \"Placed\"\n" + //
+            "}", PlatformEvent.class);
+
+        assertThat(platformEvent.getCreated()).isEqualTo(ZonedDateTime.parse("2017-04-14T13:35:23Z"));
+        assertThat(platformEvent.getCreatedById()).isEqualTo("005B00000031mqb");
+        assertThat(platformEvent.getEventData()).containsOnly(entry("Order_Number__c", "10013"),
+            entry("Type__c", "Placed"));
+    }
+}
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
index dc5e454..56b4a8b 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
@@ -27,6 +27,7 @@ import org.apache.camel.component.salesforce.SalesforceEndpointConfig;
 import org.junit.Test;
 
 import static org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.determineReplayIdFor;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -104,4 +105,11 @@ public class SubscriptionHelperTest {
         assertEquals("Expecting replayId for `my-topic-3` to be 6, as it is endpoint configured explicitly on the endpoint",
                      Optional.of(6L), determineReplayIdFor(endpoint, "my-topic-3"));
     }
+
+    @Test
+    public void shouldDetermineChannelNames() {
+        assertThat(SubscriptionHelper.getChannelName("topic1")).isEqualTo("/topic/topic1");
+        assertThat(SubscriptionHelper.getChannelName("event/Test")).isEqualTo("/event/Test__e");
+        assertThat(SubscriptionHelper.getChannelName("event/Test__e")).isEqualTo("/event/Test__e");
+    }
 }
diff --git a/components/camel-salesforce/it/resources/salesforce/objects/TestEvent__e.object b/components/camel-salesforce/it/resources/salesforce/objects/TestEvent__e.object
new file mode 100644
index 0000000..bab35f9
--- /dev/null
+++ b/components/camel-salesforce/it/resources/salesforce/objects/TestEvent__e.object
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<CustomObject xmlns="http://soap.sforce.com/2006/04/metadata">
+    <deploymentStatus>Deployed</deploymentStatus>
+    <eventType>StandardVolume</eventType>
+    <fields>
+        <fullName>Test_Field__c</fullName>
+        <externalId>false</externalId>
+        <isFilteringDisabled>false</isFilteringDisabled>
+        <isNameField>false</isNameField>
+        <isSortingDisabled>false</isSortingDisabled>
+        <label>Test Field</label>
+        <length>20</length>
+        <required>false</required>
+        <type>Text</type>
+        <unique>false</unique>
+    </fields>
+    <label>Test Event</label>
+    <pluralLabel>Test Event</pluralLabel>
+</CustomObject>
\ No newline at end of file
diff --git a/components/camel-salesforce/it/resources/salesforce/package.xml b/components/camel-salesforce/it/resources/salesforce/package.xml
index eac3328..448f80f 100644
--- a/components/camel-salesforce/it/resources/salesforce/package.xml
+++ b/components/camel-salesforce/it/resources/salesforce/package.xml
@@ -42,9 +42,7 @@
         <name>CustomField</name>
     </types>
     <types>
-        <members>Invoice__c</members>
-        <members>Line_Item__c</members>
-        <members>Merchandise__c</members>
+        <members>*</members>
         <name>CustomObject</name>
     </types>
     <types>
@@ -89,5 +87,5 @@
         <members>CamelSalesforceIntegrationTests</members>
         <name>ConnectedApp</name>
     </types>
-    <version>38.0</version>
+    <version>41.0</version>
 </Package>