You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by al...@apache.org on 2019/06/03 07:44:52 UTC

[camel] branch master updated: CAMEL-13559: Added support for change data capture to the camel-salesforce component

This is an automated email from the ASF dual-hosted git repository.

aldettinger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 3eb5179  CAMEL-13559: Added support for change data capture to the camel-salesforce component
3eb5179 is described below

commit 3eb51798c933eef69624437c6703001af27c1d75
Author: aldettinger <al...@gmail.com>
AuthorDate: Wed May 29 14:29:44 2019 +0200

    CAMEL-13559: Added support for change data capture to the camel-salesforce component
---
 .../camel-salesforce-component/README.md           |   6 +-
 .../src/main/docs/salesforce-component.adoc        |  27 +++++-
 .../component/salesforce/SalesforceConsumer.java   |  51 +++++++++-
 .../component/salesforce/SalesforceEndpoint.java   |   2 +-
 .../ChangeEventsConsumerIntegrationTest.java       | 105 +++++++++++++++++++++
 .../salesforce/SalesforceConsumerTest.java         | 103 ++++++++++++++++++--
 .../salesforce/applications/Warehouse.app          |  12 +--
 .../resources/salesforce/connectedApps/.gitignore  |   1 +
 .../CamelSalesforceIntegrationTests.connectedApp   |  29 ------
 .../it/resources/salesforce/package.xml            |   6 +-
 .../ChangeEvents.platformEventChannel}             |  17 ++--
 11 files changed, 298 insertions(+), 61 deletions(-)

diff --git a/components/camel-salesforce/camel-salesforce-component/README.md b/components/camel-salesforce/camel-salesforce-component/README.md
index 943c64e..b877b34 100644
--- a/components/camel-salesforce/camel-salesforce-component/README.md
+++ b/components/camel-salesforce/camel-salesforce-component/README.md
@@ -85,7 +85,8 @@ We have integrated _Salesforce Migration Tool_ into the Maven _integration-test_
 
 Before running the tests for the first time run the migration by invoking Maven from the `camel-salesforce-component` Maven module directory:
 
-    $ mvn -Pintegration antrun:run@setup-salesforce-instance
+    $ mvn -Pintegration resources:copy-resources@copy-test-salesforce-login-properties \
+      resources:copy-resources@set-connected-app-client-id antrun:run@setup-salesforce-instance
 
 This will create a _Connected App_ with predefined Consumer Key (the one mentioned in the comment one in `test-salesforce-login.properties.sample`) and _Consumer Secret_ (`clientSecret`) with the name of `CamelSalesforceIntegrationTests`.
 
@@ -109,9 +110,10 @@ Install the Warehouse package, tested with _Spring 2013_ (version 1.2) that can
  - add custom field `Shipping_Location` of type `GeoLocation` on the `Account` object
  - add custom field `Units_Sold` of `Number` type with maximum length of `18` on the `Line_Item` object
  - delete custom fields `Quantity`, `Invoice`, `Line_Item_Total` from the `Line_Item` object
- - delete custom field `Quantity` from the `Merchanidise` object, you will need to delete dependencies (ApexClass and Visualforce Page)
+ - delete custom field `Quantity` from the `Merchandise` object, you will need to delete dependencies (ApexClass and Visualforce Page)
  - create new ApexClass named `MerchandiseRestResource` with the content of `MerchandiseRestResource.apxc`
  - deactivate the `Contact Duplicate Rule` in salesforce configuration
+ - add `Account` to selected entities for change data capture in the _Integrations/Change Data Capture_ menu
 
 You'll need to access a Merchandise record and run a `Test Report` in order for them to appear in _Recent Items_ and _Recent Reports_. Do this by accessing _Warehouse_ application from the menu in the top right, and selecting _Merchandise_ click _Go!_ (preselected is View: _All_) and click on the single Merchandise item available. Next go to Reports and select and run _Test Report_ from _Test Reports_. This is needed by the integration tests as they access recent items and recently run reports.
 
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc b/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc
index c3d6f0a..08f1f25 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc
@@ -277,6 +277,29 @@ For example, in the simplest form to consume one event:
 PlatformEvent event = consumer.receiveBody("salesforce:event/Order_Event__e", PlatformEvent.class);
 ----
 
+==== Change data capture events
+
+On the one hand, Salesforce could be configured to emit notifications for record changes of select objects.
+On the other hand, the Camel Salesforce component could react to such notifications, allowing for instance to
+https://trailhead.salesforce.com/en/content/learn/modules/change-data-capture/understand-change-data-capture[synchronize those changes into an external system].
+
+The notifications of interest could be specified in the `from("salesforce:XXX")` clause of a Camel route via the subscription channel, e.g:
+
+[source,java]
+----
+from("salesforce:data/ChangeEvents?replayId=-1").log("being notified of all change events")
+from("salesforce:data/AccountChangeEvent?replayId=-1").log("being notified of change events for Account records")
+from("salesforce:data/Employee__ChangeEvent?replayId=-1").log("being notified of change events for Employee__c custom object")
+----
+
+The received message contains either `java.util.Map<String,Object>` or `org.cometd.bayeux.Message` in the body depending on the `rawPayload` being `false` or `true` respectively. The `CamelSalesforceChangeType` header could be valued to one of `CREATE`, `UPDATE`,  `DELETE` or `UNDELETE`.
+
+More details about how to use the Camel Salesforce component change data capture capabilities could be found in the https://github.com/apache/camel/tree/master/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/ChangeEventsConsumerIntegrationTest.java[ChangeEventsConsumerIntegrationTest].
+
+The https://developer.salesforce.com/docs/atlas.en-us.change_data_capture.meta/change_data_capture/cdc_intro.htm[Salesforce developer guide]
+is a good fit to better know the subtleties of implementing a change data capture integration application.
+The dynamic nature of change event body fields, high level replication steps as well as security considerations could be of interest.
+
 === Examples
 
 ==== Uploading a document to a ContentWorkspace
@@ -585,7 +608,7 @@ generated Java Enumerations.
 
 Please refer to
 https://github.com/apache/camel/tree/master/components/camel-salesforce/camel-salesforce-maven-plugin[README.md]
-for details on how to generated the DTO.
+for details on how to generate the DTO.
 
 === Options
 
@@ -661,7 +684,7 @@ with the following path and query parameters:
 |===
 | Name | Description | Default | Type
 | *operationName* | The operation to use |  | OperationName
-| *topicName* | The name of the topic to use |  | String
+| *topicName* | The name of the topic/channel to use |  | String
 |===
 
 
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
index 23b017c..8e3ea4a 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
@@ -43,11 +43,13 @@ import org.cometd.bayeux.client.ClientSessionChannel;
 public class SalesforceConsumer extends DefaultConsumer {
 
     private enum MessageKind {
-        PLATFORM_EVENT, PUSH_TOPIC;
+        CHANGE_EVENT, PLATFORM_EVENT, PUSH_TOPIC;
 
         public static MessageKind fromTopicName(final String topicName) {
             if (topicName.startsWith("event/") || topicName.startsWith("/event/")) {
                 return MessageKind.PLATFORM_EVENT;
+            } else if (topicName.startsWith("data/") || topicName.startsWith("/data/")) {
+                return MessageKind.CHANGE_EVENT;
             }
 
             return PUSH_TOPIC;
@@ -58,7 +60,9 @@ public class SalesforceConsumer extends DefaultConsumer {
     private static final String EVENT_PROPERTY = "event";
     private static final double MINIMUM_VERSION = 24.0;
     private static final ObjectMapper OBJECT_MAPPER = JsonUtils.createObjectMapper();
+    private static final String PAYLOAD_PROPERTY = "payload";
     private static final String REPLAY_ID_PROPERTY = "replayId";
+    private static final String SCHEMA_PROPERTY = "schema";
     private static final String SOBJECT_PROPERTY = "sobject";
     private static final String TYPE_PROPERTY = "type";
 
@@ -140,6 +144,9 @@ public class SalesforceConsumer extends DefaultConsumer {
         case PLATFORM_EVENT:
             createPlatformEventMessage(message, in);
             break;
+        case CHANGE_EVENT:
+            createChangeEventMessage(message, in);
+            break;
         default:
             throw new IllegalStateException("Unknown message kind: " + messageKind);
         }
@@ -167,23 +174,59 @@ public class SalesforceConsumer extends DefaultConsumer {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    void createChangeEventMessage(final Message message, final org.apache.camel.Message in) {
+        setHeaders(in, message);
+
+        final Map<String, Object> data = message.getDataAsMap();
+
+        final Map<String, Object> event = (Map<String, Object>)data.get(EVENT_PROPERTY);
+        final Object replayId = event.get(REPLAY_ID_PROPERTY);
+        if (replayId != null) {
+            in.setHeader("CamelSalesforceReplayId", replayId);
+        }
+
+        in.setHeader("CamelSalesforceChangeEventSchema", data.get(SCHEMA_PROPERTY));
+        in.setHeader("CamelSalesforceEventType", topicName.substring(topicName.lastIndexOf('/') + 1));
+
+        final Map<String, Object> payload = (Map<String, Object>)data.get(PAYLOAD_PROPERTY);
+        final Map<String, Object> changeEventHeader = (Map<String, Object>)payload.get("ChangeEventHeader");
+        in.setHeader("CamelSalesforceChangeType", changeEventHeader.get("changeType"));
+        in.setHeader("CamelSalesforceChangeOrigin", changeEventHeader.get("changeOrigin"));
+        in.setHeader("CamelSalesforceTransactionKey", changeEventHeader.get("transactionKey"));
+        in.setHeader("CamelSalesforceSequenceNumber", changeEventHeader.get("sequenceNumber"));
+        in.setHeader("CamelSalesforceIsTransactionEnd", changeEventHeader.get("isTransactionEnd"));
+        in.setHeader("CamelSalesforceCommitTimestamp", changeEventHeader.get("commitTimestamp"));
+        in.setHeader("CamelSalesforceCommitUser", changeEventHeader.get("commitUser"));
+        in.setHeader("CamelSalesforceCommitNumber", changeEventHeader.get("commitNumber"));
+        in.setHeader("CamelSalesforceEntityName", changeEventHeader.get("entityName"));
+        in.setHeader("CamelSalesforceRecordIds", changeEventHeader.get("recordIds"));
+
+        if (rawPayload) {
+            in.setBody(message);
+        } else {
+            payload.remove("ChangeEventHeader");
+            in.setBody(payload);
+        }
+    }
+
     void createPlatformEventMessage(final Message message, final org.apache.camel.Message in) {
         setHeaders(in, message);
 
         final Map<String, Object> data = message.getDataAsMap();
 
         @SuppressWarnings("unchecked")
-        final Map<String, Object> event = (Map<String, Object>) data.get("event");
+        final Map<String, Object> event = (Map<String, Object>) data.get(EVENT_PROPERTY);
 
         final Object replayId = event.get(REPLAY_ID_PROPERTY);
         if (replayId != null) {
             in.setHeader("CamelSalesforceReplayId", replayId);
         }
 
-        in.setHeader("CamelSalesforcePlatformEventSchema", data.get("schema"));
+        in.setHeader("CamelSalesforcePlatformEventSchema", data.get(SCHEMA_PROPERTY));
         in.setHeader("CamelSalesforceEventType", topicName.substring(topicName.lastIndexOf('/') + 1));
 
-        final Object payload = data.get("payload");
+        final Object payload = data.get(PAYLOAD_PROPERTY);
 
         final PlatformEvent platformEvent = objectMapper.convertValue(payload, PlatformEvent.class);
         in.setHeader("CamelSalesforceCreatedDate", platformEvent.getCreated());
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java
index 5033eea..ec3b5a1 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java
@@ -42,7 +42,7 @@ public class SalesforceEndpoint extends DefaultEndpoint {
         + "executeAsyncReport,getReportInstances,getReportResults,limits,approval,approvals,composite-tree,"
         + "composite-batch,composite")
     private final OperationName operationName;
-    @UriPath(label = "consumer", description = "The name of the topic to use")
+    @UriPath(label = "consumer", description = "The name of the topic/channel to use")
     private final String topicName;
     @UriParam
     private final SalesforceEndpointConfig config;
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/ChangeEventsConsumerIntegrationTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/ChangeEventsConsumerIntegrationTest.java
new file mode 100644
index 0000000..268a0108
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/ChangeEventsConsumerIntegrationTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Message;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.salesforce.api.dto.CreateSObjectResult;
+import org.apache.camel.component.salesforce.dto.generated.Account;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * During integration tests setup, Salesforce has been configured to fire change
+ * events for Account objects. This test merely uses some API calls to trigger
+ * some change events, and then perform assertion on the received events.
+ */
+public class ChangeEventsConsumerIntegrationTest extends AbstractSalesforceTestBase {
+
+    private static final String ACCOUNT_NAME = "ChangeEventsConsumerIntegrationTest-TestAccount";
+    private static final String ACCOUNT_DESCRIPTION = "Account used to check that creation, update and deletion fire change events";
+
+    @EndpointInject(value = "mock:capturedChangeEvents")
+    private MockEndpoint capturedChangeEvents;
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void accountChangesShouldTriggerChangeEvents() {
+        // Trigger a CREATE event for an Account
+        final Account account = new Account();
+        account.setName(ACCOUNT_NAME);
+        final CreateSObjectResult result = template.requestBody("salesforce:createSObject?sObjectName=Account", account, CreateSObjectResult.class);
+        Assert.assertNotNull(result.getId());
+
+        // Trigger an UPDATE event for an Account
+        account.setDescription(ACCOUNT_DESCRIPTION);
+        account.setId(result.getId());
+        template.sendBody("salesforce:updateSObject", account);
+
+        // Trigger a DELETE event for an Account
+        template.sendBody("salesforce:deleteSObject", account);
+
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+            Assertions.assertThat(capturedChangeEvents.assertExchangeReceived(2) != null).isTrue();
+        });
+
+        final Message createEvent = capturedChangeEvents.getExchanges().get(0).getIn();
+        Assert.assertNotNull(createEvent);
+        Assert.assertEquals("CREATE", createEvent.getHeader("CamelSalesforceChangeType"));
+        final Map<String, Object> createEventBody = (Map<String, Object>)createEvent.getBody(Map.class);
+        Assert.assertNotNull(createEventBody);
+        Assert.assertEquals(ACCOUNT_NAME, createEventBody.get("Name"));
+        Assert.assertFalse(createEventBody.containsKey("Description"));
+
+        final Message updateEvent = capturedChangeEvents.getExchanges().get(1).getIn();
+        Assert.assertNotNull(updateEvent);
+        Assert.assertEquals("UPDATE", updateEvent.getHeader("CamelSalesforceChangeType"));
+        final Map<String, Object> updateEventBody = (Map<String, Object>)updateEvent.getBody(Map.class);
+        Assert.assertNotNull(updateEventBody);
+        Assert.assertFalse(updateEventBody.containsKey("Name"));
+        Assert.assertEquals(ACCOUNT_DESCRIPTION, updateEventBody.get("Description"));
+
+        final Message deleteEvent = capturedChangeEvents.getExchanges().get(2).getIn();
+        Assert.assertNotNull(deleteEvent);
+        Assert.assertEquals("DELETE", deleteEvent.getHeader("CamelSalesforceChangeType"));
+        final Map<String, Object> deleteEventBody = (Map<String, Object>)deleteEvent.getBody(Map.class);
+        Assert.assertFalse(deleteEventBody.containsKey("Name"));
+        Assert.assertFalse(deleteEventBody.containsKey("Description"));
+    }
+
+    @Override
+    protected RouteBuilder doCreateRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("salesforce:data/ChangeEvents?replayId=-1").to(capturedChangeEvents);
+            }
+        };
+    }
+
+    @Override
+    protected String salesforceApiVersionToUse() {
+        return "45.0";
+    }
+}
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/SalesforceConsumerTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/SalesforceConsumerTest.java
index 30a0e50..dffd12d 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/SalesforceConsumerTest.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/SalesforceConsumerTest.java
@@ -35,18 +35,23 @@ import org.cometd.bayeux.client.ClientSessionChannel;
 import org.cometd.common.HashMapMessage;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.same;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
+@RunWith(MockitoJUnitRunner.class)
 public class SalesforceConsumerTest {
 
     public static class AccountUpdates {
-
         @JsonProperty("Id")
         String id;
 
@@ -77,17 +82,21 @@ public class SalesforceConsumerTest {
     static final SubscriptionHelper NOT_USED = null;
 
     SalesforceEndpointConfig configuration = new SalesforceEndpointConfig();
-
     SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class);
-
     Exchange exchange = mock(Exchange.class);
-
     org.apache.camel.Message in = mock(org.apache.camel.Message.class);
-
     AsyncProcessor processor = mock(AsyncProcessor.class);
-
     Message pushTopicMessage;
 
+    @Mock
+    private Message mockChangeEvent;
+    @Mock
+    private Map<String, Object> mockChangeEventPayload;
+    @Mock
+    private Map<String, Object> mockChangeEventData;
+    @Mock
+    private Map<String, Object> mockChangeEventMap;
+
     @Before
     public void setupMocks() {
         when(endpoint.getConfiguration()).thenReturn(configuration);
@@ -102,6 +111,33 @@ public class SalesforceConsumerTest {
         when(classResolver.resolveClass(AccountUpdates.class.getName())).thenReturn((Class) AccountUpdates.class);
 
         pushTopicMessage = createPushTopicMessage();
+
+        setupMockChangeEvent();
+    }
+
+    private void setupMockChangeEvent() {
+        final Map<String, Object> changeEventHeader = new HashMap<>();
+        changeEventHeader.put("changeType", "CREATE");
+        changeEventHeader.put("changeOrigin", "com/salesforce/api/rest/45.0");
+        changeEventHeader.put("transactionKey", "000bc577-90c7-0d33-cebb-971bb50d75b8");
+        changeEventHeader.put("sequenceNumber", 1L);
+        changeEventHeader.put("isTransactionEnd", Boolean.TRUE);
+        changeEventHeader.put("commitTimestamp", 1558949299000L);
+        changeEventHeader.put("commitUser", "0052p000009cl8uBBB");
+        changeEventHeader.put("commitNumber", 10585193272713L);
+        changeEventHeader.put("entityName", "Account");
+        changeEventHeader.put("recordIds", new Object[] {"0012p00002HHpNlAAL"});
+
+        when(mockChangeEventPayload.get("ChangeEventHeader")).thenReturn(changeEventHeader);
+
+        when(mockChangeEventMap.get("replayId")).thenReturn(4L);
+
+        when(mockChangeEventData.get("schema")).thenReturn("30H2pgzuWcF844p26Ityvg");
+        when(mockChangeEventData.get("payload")).thenReturn(mockChangeEventPayload);
+        when(mockChangeEventData.get("event")).thenReturn(mockChangeEventMap);
+
+        when(mockChangeEvent.getDataAsMap()).thenReturn(mockChangeEventData);
+        when(mockChangeEvent.getChannel()).thenReturn("/data/AccountChangeEvent");
     }
 
     @Test
@@ -240,6 +276,61 @@ public class SalesforceConsumerTest {
         verifyNoMoreInteractions(in, processor);
     }
 
+    @Test
+    public void shouldProcessChangeEvents() throws Exception {
+        when(endpoint.getTopicName()).thenReturn("/data/AccountChangeEvent");
+
+        final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED);
+
+        consumer.processMessage(mock(ClientSessionChannel.class), mockChangeEvent);
+
+        verify(in).setBody(mockChangeEventPayload);
+        verify(in).setHeader("CamelSalesforceChannel", "/data/AccountChangeEvent");
+        verify(in).setHeader("CamelSalesforceReplayId", 4L);
+        verify(in).setHeader("CamelSalesforceChangeEventSchema", "30H2pgzuWcF844p26Ityvg");
+        verify(in).setHeader("CamelSalesforceEventType", "AccountChangeEvent");
+        verify(in).setHeader("CamelSalesforceChangeType", "CREATE");
+        verify(in).setHeader("CamelSalesforceChangeOrigin", "com/salesforce/api/rest/45.0");
+        verify(in).setHeader("CamelSalesforceTransactionKey", "000bc577-90c7-0d33-cebb-971bb50d75b8");
+        verify(in).setHeader("CamelSalesforceSequenceNumber", 1L);
+        verify(in).setHeader("CamelSalesforceIsTransactionEnd", Boolean.TRUE);
+        verify(in).setHeader("CamelSalesforceCommitTimestamp", 1558949299000L);
+        verify(in).setHeader("CamelSalesforceCommitUser", "0052p000009cl8uBBB");
+        verify(in).setHeader("CamelSalesforceCommitNumber", 10585193272713L);
+        verify(in).setHeader("CamelSalesforceEntityName", "Account");
+        verify(in).setHeader("CamelSalesforceRecordIds", new Object[] {"0012p00002HHpNlAAL"});
+
+        verify(mockChangeEventPayload).remove("ChangeEventHeader");
+
+        verify(processor).process(same(exchange), any());
+
+        verifyNoMoreInteractions(in, processor);
+    }
+
+    @Test
+    public void processNoReplayIdChangeEventsShouldNotSetReplayIdHeader() throws Exception {
+        when(endpoint.getTopicName()).thenReturn("/data/AccountChangeEvent");
+        when(mockChangeEventMap.get("replayId")).thenReturn(null);
+
+        final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED);
+
+        consumer.processMessage(mock(ClientSessionChannel.class), mockChangeEvent);
+
+        verify(in, never()).setHeader(eq("CamelSalesforceReplayId"), any());
+    }
+
+    @Test
+    public void processRawPayloadChangeEventsShouldSetInputMessageAsBody() throws Exception {
+        when(endpoint.getTopicName()).thenReturn("/data/AccountChangeEvent");
+        configuration.setRawPayload(true);
+
+        final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED);
+
+        consumer.processMessage(mock(ClientSessionChannel.class), mockChangeEvent);
+
+        verify(in).setBody(mockChangeEvent);
+    }
+
     static Message createPushTopicMessage() {
         final Message pushTopicMessage = new HashMapMessage();
         pushTopicMessage.put("clientId", "lxdl9o32njygi1gj47kgfaga4k");
diff --git a/components/camel-salesforce/it/resources/salesforce/applications/Warehouse.app b/components/camel-salesforce/it/resources/salesforce/applications/Warehouse.app
index bb76152..0293342 100644
--- a/components/camel-salesforce/it/resources/salesforce/applications/Warehouse.app
+++ b/components/camel-salesforce/it/resources/salesforce/applications/Warehouse.app
@@ -18,10 +18,10 @@
 <CustomApplication xmlns="http://soap.sforce.com/2006/04/metadata">
     <defaultLandingTab>Merchandise__c</defaultLandingTab>
     <label>Warehouse</label>
-    <tab>standard-Chatter</tab>
-    <tab>standard-File</tab>
-    <tab>standard-report</tab>
-    <tab>standard-Dashboard</tab>
-    <tab>Invoice__c</tab>
-    <tab>Merchandise__c</tab>
+    <tabs>standard-Chatter</tabs>
+    <tabs>standard-File</tabs>
+    <tabs>standard-report</tabs>
+    <tabs>standard-Dashboard</tabs>
+    <tabs>Invoice__c</tabs>
+    <tabs>Merchandise__c</tabs>
 </CustomApplication>
diff --git a/components/camel-salesforce/it/resources/salesforce/connectedApps/.gitignore b/components/camel-salesforce/it/resources/salesforce/connectedApps/.gitignore
new file mode 100644
index 0000000..464cef4
--- /dev/null
+++ b/components/camel-salesforce/it/resources/salesforce/connectedApps/.gitignore
@@ -0,0 +1 @@
+CamelSalesforceIntegrationTests.connectedApp
diff --git a/components/camel-salesforce/it/resources/salesforce/connectedApps/CamelSalesforceIntegrationTests.connectedApp b/components/camel-salesforce/it/resources/salesforce/connectedApps/CamelSalesforceIntegrationTests.connectedApp
deleted file mode 100644
index 0ca3469..0000000
--- a/components/camel-salesforce/it/resources/salesforce/connectedApps/CamelSalesforceIntegrationTests.connectedApp
+++ /dev/null
@@ -1,29 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<ConnectedApp xmlns="http://soap.sforce.com/2006/04/metadata">
-    <contactEmail>cbactrianus@gmail.com</contactEmail>
-    <label>CamelSalesforceIntegrationTests</label>
-    <oauthConfig>
-        <callbackUrl>https://login.salesforce.com/services/oauth2/success</callbackUrl>
-        <certificate>MIIC1TCCAb2gAwIBAgIEM3ZMGjANBgkqhkiG9w0BAQsFADAaMRgwFgYDVQQDEw9TYWxlc2ZvcmNlIHRlc3QwIBcNMTcwMzE0MjIxMjU0WhgPMjExNzAyMTgyMjEyNTRaMBoxGDAWBgNVBAMTD1NhbGVzZm9yY2UgdGVzdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMbZinVCZerzZvyyQlfE6nMpEQRfVsjpcfT01UTG/bwzWorP7YRpGkDW7Q4eu6IPrHtohkhM3JtsSVka5jfS1iEguMXLdNkEyMjMiBrWJeyGfcISF1yazgqLxxwcwGjMn3C9xV5tBxiRSqtMrV1iRx3fxmLue1UnZjSyUaG+Vi+FcKxqre5ixApeDCZHLONxBy3mjWK4GIeBBbUqQqy3LNrT6B34WdNX8vTslpTKOlmLyycEI/Rx+A4lNaultrJHdnRhGBr [...]
-        <consumerKey>3MVG9szVa2RxsqBZXHfqsW3hf9HQp_N6qdSmpjKMzSJaEL4UP161JlDkE32EigL82ra_jM1WuQgF4rYDgzL3u</consumerKey>
-        <consumerSecret>1039611643161946846</consumerSecret>
-        <scopes>Api</scopes>
-        <scopes>RefreshToken</scopes>
-    </oauthConfig>
-</ConnectedApp>
diff --git a/components/camel-salesforce/it/resources/salesforce/package.xml b/components/camel-salesforce/it/resources/salesforce/package.xml
index 448f80f..bb974e6 100644
--- a/components/camel-salesforce/it/resources/salesforce/package.xml
+++ b/components/camel-salesforce/it/resources/salesforce/package.xml
@@ -87,5 +87,9 @@
         <members>CamelSalesforceIntegrationTests</members>
         <name>ConnectedApp</name>
     </types>
-    <version>41.0</version>
+    <types>
+        <members>ChangeEvents</members>
+        <name>PlatformEventChannel</name>
+    </types>
+    <version>45.0</version>
 </Package>
diff --git a/components/camel-salesforce/it/resources/salesforce/applications/Warehouse.app b/components/camel-salesforce/it/resources/salesforce/platformEventChannels/ChangeEvents.platformEventChannel
similarity index 69%
copy from components/camel-salesforce/it/resources/salesforce/applications/Warehouse.app
copy to components/camel-salesforce/it/resources/salesforce/platformEventChannels/ChangeEvents.platformEventChannel
index bb76152..e309403 100644
--- a/components/camel-salesforce/it/resources/salesforce/applications/Warehouse.app
+++ b/components/camel-salesforce/it/resources/salesforce/platformEventChannels/ChangeEvents.platformEventChannel
@@ -15,13 +15,10 @@
   See the License for the specific language governing permissions and
   limitations under the License.
 -->
-<CustomApplication xmlns="http://soap.sforce.com/2006/04/metadata">
-    <defaultLandingTab>Merchandise__c</defaultLandingTab>
-    <label>Warehouse</label>
-    <tab>standard-Chatter</tab>
-    <tab>standard-File</tab>
-    <tab>standard-report</tab>
-    <tab>standard-Dashboard</tab>
-    <tab>Invoice__c</tab>
-    <tab>Merchandise__c</tab>
-</CustomApplication>
+<PlatformEventChannel xmlns="http://soap.sforce.com/2006/04/metadata">
+    <channelMembers>
+        <selectedEntity>AccountChangeEvent</selectedEntity>
+    </channelMembers>
+    <channelType>data</channelType>
+    <label>ChangeEvents</label>
+</PlatformEventChannel>