You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by al...@apache.org on 2019/06/03 07:44:52 UTC
[camel] branch master updated: CAMEL-13559: Added support for
change data capture to the camel-salesforce component
This is an automated email from the ASF dual-hosted git repository.
aldettinger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 3eb5179 CAMEL-13559: Added support for change data capture to the camel-salesforce component
3eb5179 is described below
commit 3eb51798c933eef69624437c6703001af27c1d75
Author: aldettinger <al...@gmail.com>
AuthorDate: Wed May 29 14:29:44 2019 +0200
CAMEL-13559: Added support for change data capture to the camel-salesforce component
---
.../camel-salesforce-component/README.md | 6 +-
.../src/main/docs/salesforce-component.adoc | 27 +++++-
.../component/salesforce/SalesforceConsumer.java | 51 +++++++++-
.../component/salesforce/SalesforceEndpoint.java | 2 +-
.../ChangeEventsConsumerIntegrationTest.java | 105 +++++++++++++++++++++
.../salesforce/SalesforceConsumerTest.java | 103 ++++++++++++++++++--
.../salesforce/applications/Warehouse.app | 12 +--
.../resources/salesforce/connectedApps/.gitignore | 1 +
.../CamelSalesforceIntegrationTests.connectedApp | 29 ------
.../it/resources/salesforce/package.xml | 6 +-
.../ChangeEvents.platformEventChannel} | 17 ++--
11 files changed, 298 insertions(+), 61 deletions(-)
diff --git a/components/camel-salesforce/camel-salesforce-component/README.md b/components/camel-salesforce/camel-salesforce-component/README.md
index 943c64e..b877b34 100644
--- a/components/camel-salesforce/camel-salesforce-component/README.md
+++ b/components/camel-salesforce/camel-salesforce-component/README.md
@@ -85,7 +85,8 @@ We have integrated _Salesforce Migration Tool_ into the Maven _integration-test_
Before running the tests for the first time run the migration by invoking Maven from the `camel-salesforce-component` Maven module directory:
- $ mvn -Pintegration antrun:run@setup-salesforce-instance
+ $ mvn -Pintegration resources:copy-resources@copy-test-salesforce-login-properties \
+ resources:copy-resources@set-connected-app-client-id antrun:run@setup-salesforce-instance
This will create a _Connected App_ with predefined Consumer Key (the one mentioned in the comment one in `test-salesforce-login.properties.sample`) and _Consumer Secret_ (`clientSecret`) with the name of `CamelSalesforceIntegrationTests`.
@@ -109,9 +110,10 @@ Install the Warehouse package, tested with _Spring 2013_ (version 1.2) that can
- add custom field `Shipping_Location` of type `GeoLocation` on the `Account` object
- add custom field `Units_Sold` of `Number` type with maximum length of `18` on the `Line_Item` object
- delete custom fields `Quantity`, `Invoice`, `Line_Item_Total` from the `Line_Item` object
- - delete custom field `Quantity` from the `Merchanidise` object, you will need to delete dependencies (ApexClass and Visualforce Page)
+ - delete custom field `Quantity` from the `Merchandise` object, you will need to delete dependencies (ApexClass and Visualforce Page)
- create new ApexClass named `MerchandiseRestResource` with the content of `MerchandiseRestResource.apxc`
- deactivate the `Contact Duplicate Rule` in salesforce configuration
+ - add `Account` to selected entities for change data capture in the _Integrations/Change Data Capture_ menu
You'll need to access a Merchandise record and run a `Test Report` in order for them to appear in _Recent Items_ and _Recent Reports_. Do this by accessing _Warehouse_ application from the menu in the top right, and selecting _Merchandise_ click _Go!_ (preselected is View: _All_) and click on the single Merchandise item available. Next go to Reports and select and run _Test Report_ from _Test Reports_. This is needed by the integration tests as they access recent items and recently run reports.
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc b/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc
index c3d6f0a..08f1f25 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc
@@ -277,6 +277,29 @@ For example, in the simplest form to consume one event:
PlatformEvent event = consumer.receiveBody("salesforce:event/Order_Event__e", PlatformEvent.class);
----
+==== Change data capture events
+
+On the one hand, Salesforce could be configured to emit notifications for record changes of select objects.
+On the other hand, the Camel Salesforce component could react to such notifications, allowing for instance to
+https://trailhead.salesforce.com/en/content/learn/modules/change-data-capture/understand-change-data-capture[synchronize those changes into an external system].
+
+The notifications of interest could be specified in the `from("salesforce:XXX")` clause of a Camel route via the subscription channel, e.g:
+
+[source,java]
+----
+from("salesforce:data/ChangeEvents?replayId=-1").log("being notified of all change events")
+from("salesforce:data/AccountChangeEvent?replayId=-1").log("being notified of change events for Account records")
+from("salesforce:data/Employee__ChangeEvent?replayId=-1").log("being notified of change events for Employee__c custom object")
+----
+
+The received message contains either `java.util.Map<String,Object>` or `org.cometd.bayeux.Message` in the body depending on the `rawPayload` being `false` or `true` respectively. The `CamelSalesforceChangeType` header could be valued to one of `CREATE`, `UPDATE`, `DELETE` or `UNDELETE`.
+
+More details about how to use the Camel Salesforce component change data capture capabilities could be found in the https://github.com/apache/camel/tree/master/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/ChangeEventsConsumerIntegrationTest.java[ChangeEventsConsumerIntegrationTest].
+
+The https://developer.salesforce.com/docs/atlas.en-us.change_data_capture.meta/change_data_capture/cdc_intro.htm[Salesforce developer guide]
+is a good fit to better know the subtleties of implementing a change data capture integration application.
+The dynamic nature of change event body fields, high level replication steps as well as security considerations could be of interest.
+
=== Examples
==== Uploading a document to a ContentWorkspace
@@ -585,7 +608,7 @@ generated Java Enumerations.
Please refer to
https://github.com/apache/camel/tree/master/components/camel-salesforce/camel-salesforce-maven-plugin[README.md]
-for details on how to generated the DTO.
+for details on how to generate the DTO.
=== Options
@@ -661,7 +684,7 @@ with the following path and query parameters:
|===
| Name | Description | Default | Type
| *operationName* | The operation to use | | OperationName
-| *topicName* | The name of the topic to use | | String
+| *topicName* | The name of the topic/channel to use | | String
|===
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
index 23b017c..8e3ea4a 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
@@ -43,11 +43,13 @@ import org.cometd.bayeux.client.ClientSessionChannel;
public class SalesforceConsumer extends DefaultConsumer {
private enum MessageKind {
- PLATFORM_EVENT, PUSH_TOPIC;
+ CHANGE_EVENT, PLATFORM_EVENT, PUSH_TOPIC;
public static MessageKind fromTopicName(final String topicName) {
if (topicName.startsWith("event/") || topicName.startsWith("/event/")) {
return MessageKind.PLATFORM_EVENT;
+ } else if (topicName.startsWith("data/") || topicName.startsWith("/data/")) {
+ return MessageKind.CHANGE_EVENT;
}
return PUSH_TOPIC;
@@ -58,7 +60,9 @@ public class SalesforceConsumer extends DefaultConsumer {
private static final String EVENT_PROPERTY = "event";
private static final double MINIMUM_VERSION = 24.0;
private static final ObjectMapper OBJECT_MAPPER = JsonUtils.createObjectMapper();
+ private static final String PAYLOAD_PROPERTY = "payload";
private static final String REPLAY_ID_PROPERTY = "replayId";
+ private static final String SCHEMA_PROPERTY = "schema";
private static final String SOBJECT_PROPERTY = "sobject";
private static final String TYPE_PROPERTY = "type";
@@ -140,6 +144,9 @@ public class SalesforceConsumer extends DefaultConsumer {
case PLATFORM_EVENT:
createPlatformEventMessage(message, in);
break;
+ case CHANGE_EVENT:
+ createChangeEventMessage(message, in);
+ break;
default:
throw new IllegalStateException("Unknown message kind: " + messageKind);
}
@@ -167,23 +174,59 @@ public class SalesforceConsumer extends DefaultConsumer {
}
}
+ @SuppressWarnings("unchecked")
+ void createChangeEventMessage(final Message message, final org.apache.camel.Message in) {
+ setHeaders(in, message);
+
+ final Map<String, Object> data = message.getDataAsMap();
+
+ final Map<String, Object> event = (Map<String, Object>)data.get(EVENT_PROPERTY);
+ final Object replayId = event.get(REPLAY_ID_PROPERTY);
+ if (replayId != null) {
+ in.setHeader("CamelSalesforceReplayId", replayId);
+ }
+
+ in.setHeader("CamelSalesforceChangeEventSchema", data.get(SCHEMA_PROPERTY));
+ in.setHeader("CamelSalesforceEventType", topicName.substring(topicName.lastIndexOf('/') + 1));
+
+ final Map<String, Object> payload = (Map<String, Object>)data.get(PAYLOAD_PROPERTY);
+ final Map<String, Object> changeEventHeader = (Map<String, Object>)payload.get("ChangeEventHeader");
+ in.setHeader("CamelSalesforceChangeType", changeEventHeader.get("changeType"));
+ in.setHeader("CamelSalesforceChangeOrigin", changeEventHeader.get("changeOrigin"));
+ in.setHeader("CamelSalesforceTransactionKey", changeEventHeader.get("transactionKey"));
+ in.setHeader("CamelSalesforceSequenceNumber", changeEventHeader.get("sequenceNumber"));
+ in.setHeader("CamelSalesforceIsTransactionEnd", changeEventHeader.get("isTransactionEnd"));
+ in.setHeader("CamelSalesforceCommitTimestamp", changeEventHeader.get("commitTimestamp"));
+ in.setHeader("CamelSalesforceCommitUser", changeEventHeader.get("commitUser"));
+ in.setHeader("CamelSalesforceCommitNumber", changeEventHeader.get("commitNumber"));
+ in.setHeader("CamelSalesforceEntityName", changeEventHeader.get("entityName"));
+ in.setHeader("CamelSalesforceRecordIds", changeEventHeader.get("recordIds"));
+
+ if (rawPayload) {
+ in.setBody(message);
+ } else {
+ payload.remove("ChangeEventHeader");
+ in.setBody(payload);
+ }
+ }
+
void createPlatformEventMessage(final Message message, final org.apache.camel.Message in) {
setHeaders(in, message);
final Map<String, Object> data = message.getDataAsMap();
@SuppressWarnings("unchecked")
- final Map<String, Object> event = (Map<String, Object>) data.get("event");
+ final Map<String, Object> event = (Map<String, Object>) data.get(EVENT_PROPERTY);
final Object replayId = event.get(REPLAY_ID_PROPERTY);
if (replayId != null) {
in.setHeader("CamelSalesforceReplayId", replayId);
}
- in.setHeader("CamelSalesforcePlatformEventSchema", data.get("schema"));
+ in.setHeader("CamelSalesforcePlatformEventSchema", data.get(SCHEMA_PROPERTY));
in.setHeader("CamelSalesforceEventType", topicName.substring(topicName.lastIndexOf('/') + 1));
- final Object payload = data.get("payload");
+ final Object payload = data.get(PAYLOAD_PROPERTY);
final PlatformEvent platformEvent = objectMapper.convertValue(payload, PlatformEvent.class);
in.setHeader("CamelSalesforceCreatedDate", platformEvent.getCreated());
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java
index 5033eea..ec3b5a1 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java
@@ -42,7 +42,7 @@ public class SalesforceEndpoint extends DefaultEndpoint {
+ "executeAsyncReport,getReportInstances,getReportResults,limits,approval,approvals,composite-tree,"
+ "composite-batch,composite")
private final OperationName operationName;
- @UriPath(label = "consumer", description = "The name of the topic to use")
+ @UriPath(label = "consumer", description = "The name of the topic/channel to use")
private final String topicName;
@UriParam
private final SalesforceEndpointConfig config;
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/ChangeEventsConsumerIntegrationTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/ChangeEventsConsumerIntegrationTest.java
new file mode 100644
index 0000000..268a0108
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/ChangeEventsConsumerIntegrationTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Message;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.salesforce.api.dto.CreateSObjectResult;
+import org.apache.camel.component.salesforce.dto.generated.Account;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * During integration tests setup, Salesforce has been configured to fire change
+ * events for Account objects. This test merely uses some API calls to trigger
+ * some change events, and then perform assertion on the received events.
+ */
+public class ChangeEventsConsumerIntegrationTest extends AbstractSalesforceTestBase {
+
+ private static final String ACCOUNT_NAME = "ChangeEventsConsumerIntegrationTest-TestAccount";
+ private static final String ACCOUNT_DESCRIPTION = "Account used to check that creation, update and deletion fire change events";
+
+ @EndpointInject(value = "mock:capturedChangeEvents")
+ private MockEndpoint capturedChangeEvents;
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void accountChangesShouldTriggerChangeEvents() {
+ // Trigger a CREATE event for an Account
+ final Account account = new Account();
+ account.setName(ACCOUNT_NAME);
+ final CreateSObjectResult result = template.requestBody("salesforce:createSObject?sObjectName=Account", account, CreateSObjectResult.class);
+ Assert.assertNotNull(result.getId());
+
+ // Trigger an UPDATE event for an Account
+ account.setDescription(ACCOUNT_DESCRIPTION);
+ account.setId(result.getId());
+ template.sendBody("salesforce:updateSObject", account);
+
+ // Trigger a DELETE event for an Account
+ template.sendBody("salesforce:deleteSObject", account);
+
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ Assertions.assertThat(capturedChangeEvents.assertExchangeReceived(2) != null).isTrue();
+ });
+
+ final Message createEvent = capturedChangeEvents.getExchanges().get(0).getIn();
+ Assert.assertNotNull(createEvent);
+ Assert.assertEquals("CREATE", createEvent.getHeader("CamelSalesforceChangeType"));
+ final Map<String, Object> createEventBody = (Map<String, Object>)createEvent.getBody(Map.class);
+ Assert.assertNotNull(createEventBody);
+ Assert.assertEquals(ACCOUNT_NAME, createEventBody.get("Name"));
+ Assert.assertFalse(createEventBody.containsKey("Description"));
+
+ final Message updateEvent = capturedChangeEvents.getExchanges().get(1).getIn();
+ Assert.assertNotNull(updateEvent);
+ Assert.assertEquals("UPDATE", updateEvent.getHeader("CamelSalesforceChangeType"));
+ final Map<String, Object> updateEventBody = (Map<String, Object>)updateEvent.getBody(Map.class);
+ Assert.assertNotNull(updateEventBody);
+ Assert.assertFalse(updateEventBody.containsKey("Name"));
+ Assert.assertEquals(ACCOUNT_DESCRIPTION, updateEventBody.get("Description"));
+
+ final Message deleteEvent = capturedChangeEvents.getExchanges().get(2).getIn();
+ Assert.assertNotNull(deleteEvent);
+ Assert.assertEquals("DELETE", deleteEvent.getHeader("CamelSalesforceChangeType"));
+ final Map<String, Object> deleteEventBody = (Map<String, Object>)deleteEvent.getBody(Map.class);
+ Assert.assertFalse(deleteEventBody.containsKey("Name"));
+ Assert.assertFalse(deleteEventBody.containsKey("Description"));
+ }
+
+ @Override
+ protected RouteBuilder doCreateRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("salesforce:data/ChangeEvents?replayId=-1").to(capturedChangeEvents);
+ }
+ };
+ }
+
+ @Override
+ protected String salesforceApiVersionToUse() {
+ return "45.0";
+ }
+}
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/SalesforceConsumerTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/SalesforceConsumerTest.java
index 30a0e50..dffd12d 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/SalesforceConsumerTest.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/SalesforceConsumerTest.java
@@ -35,18 +35,23 @@ import org.cometd.bayeux.client.ClientSessionChannel;
import org.cometd.common.HashMapMessage;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+@RunWith(MockitoJUnitRunner.class)
public class SalesforceConsumerTest {
public static class AccountUpdates {
-
@JsonProperty("Id")
String id;
@@ -77,17 +82,21 @@ public class SalesforceConsumerTest {
static final SubscriptionHelper NOT_USED = null;
SalesforceEndpointConfig configuration = new SalesforceEndpointConfig();
-
SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class);
-
Exchange exchange = mock(Exchange.class);
-
org.apache.camel.Message in = mock(org.apache.camel.Message.class);
-
AsyncProcessor processor = mock(AsyncProcessor.class);
-
Message pushTopicMessage;
+ @Mock
+ private Message mockChangeEvent;
+ @Mock
+ private Map<String, Object> mockChangeEventPayload;
+ @Mock
+ private Map<String, Object> mockChangeEventData;
+ @Mock
+ private Map<String, Object> mockChangeEventMap;
+
@Before
public void setupMocks() {
when(endpoint.getConfiguration()).thenReturn(configuration);
@@ -102,6 +111,33 @@ public class SalesforceConsumerTest {
when(classResolver.resolveClass(AccountUpdates.class.getName())).thenReturn((Class) AccountUpdates.class);
pushTopicMessage = createPushTopicMessage();
+
+ setupMockChangeEvent();
+ }
+
+ private void setupMockChangeEvent() {
+ final Map<String, Object> changeEventHeader = new HashMap<>();
+ changeEventHeader.put("changeType", "CREATE");
+ changeEventHeader.put("changeOrigin", "com/salesforce/api/rest/45.0");
+ changeEventHeader.put("transactionKey", "000bc577-90c7-0d33-cebb-971bb50d75b8");
+ changeEventHeader.put("sequenceNumber", 1L);
+ changeEventHeader.put("isTransactionEnd", Boolean.TRUE);
+ changeEventHeader.put("commitTimestamp", 1558949299000L);
+ changeEventHeader.put("commitUser", "0052p000009cl8uBBB");
+ changeEventHeader.put("commitNumber", 10585193272713L);
+ changeEventHeader.put("entityName", "Account");
+ changeEventHeader.put("recordIds", new Object[] {"0012p00002HHpNlAAL"});
+
+ when(mockChangeEventPayload.get("ChangeEventHeader")).thenReturn(changeEventHeader);
+
+ when(mockChangeEventMap.get("replayId")).thenReturn(4L);
+
+ when(mockChangeEventData.get("schema")).thenReturn("30H2pgzuWcF844p26Ityvg");
+ when(mockChangeEventData.get("payload")).thenReturn(mockChangeEventPayload);
+ when(mockChangeEventData.get("event")).thenReturn(mockChangeEventMap);
+
+ when(mockChangeEvent.getDataAsMap()).thenReturn(mockChangeEventData);
+ when(mockChangeEvent.getChannel()).thenReturn("/data/AccountChangeEvent");
}
@Test
@@ -240,6 +276,61 @@ public class SalesforceConsumerTest {
verifyNoMoreInteractions(in, processor);
}
+ @Test
+ public void shouldProcessChangeEvents() throws Exception {
+ when(endpoint.getTopicName()).thenReturn("/data/AccountChangeEvent");
+
+ final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED);
+
+ consumer.processMessage(mock(ClientSessionChannel.class), mockChangeEvent);
+
+ verify(in).setBody(mockChangeEventPayload);
+ verify(in).setHeader("CamelSalesforceChannel", "/data/AccountChangeEvent");
+ verify(in).setHeader("CamelSalesforceReplayId", 4L);
+ verify(in).setHeader("CamelSalesforceChangeEventSchema", "30H2pgzuWcF844p26Ityvg");
+ verify(in).setHeader("CamelSalesforceEventType", "AccountChangeEvent");
+ verify(in).setHeader("CamelSalesforceChangeType", "CREATE");
+ verify(in).setHeader("CamelSalesforceChangeOrigin", "com/salesforce/api/rest/45.0");
+ verify(in).setHeader("CamelSalesforceTransactionKey", "000bc577-90c7-0d33-cebb-971bb50d75b8");
+ verify(in).setHeader("CamelSalesforceSequenceNumber", 1L);
+ verify(in).setHeader("CamelSalesforceIsTransactionEnd", Boolean.TRUE);
+ verify(in).setHeader("CamelSalesforceCommitTimestamp", 1558949299000L);
+ verify(in).setHeader("CamelSalesforceCommitUser", "0052p000009cl8uBBB");
+ verify(in).setHeader("CamelSalesforceCommitNumber", 10585193272713L);
+ verify(in).setHeader("CamelSalesforceEntityName", "Account");
+ verify(in).setHeader("CamelSalesforceRecordIds", new Object[] {"0012p00002HHpNlAAL"});
+
+ verify(mockChangeEventPayload).remove("ChangeEventHeader");
+
+ verify(processor).process(same(exchange), any());
+
+ verifyNoMoreInteractions(in, processor);
+ }
+
+ @Test
+ public void processNoReplayIdChangeEventsShouldNotSetReplayIdHeader() throws Exception {
+ when(endpoint.getTopicName()).thenReturn("/data/AccountChangeEvent");
+ when(mockChangeEventMap.get("replayId")).thenReturn(null);
+
+ final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED);
+
+ consumer.processMessage(mock(ClientSessionChannel.class), mockChangeEvent);
+
+ verify(in, never()).setHeader(eq("CamelSalesforceReplayId"), any());
+ }
+
+ @Test
+ public void processRawPayloadChangeEventsShouldSetInputMessageAsBody() throws Exception {
+ when(endpoint.getTopicName()).thenReturn("/data/AccountChangeEvent");
+ configuration.setRawPayload(true);
+
+ final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED);
+
+ consumer.processMessage(mock(ClientSessionChannel.class), mockChangeEvent);
+
+ verify(in).setBody(mockChangeEvent);
+ }
+
static Message createPushTopicMessage() {
final Message pushTopicMessage = new HashMapMessage();
pushTopicMessage.put("clientId", "lxdl9o32njygi1gj47kgfaga4k");
diff --git a/components/camel-salesforce/it/resources/salesforce/applications/Warehouse.app b/components/camel-salesforce/it/resources/salesforce/applications/Warehouse.app
index bb76152..0293342 100644
--- a/components/camel-salesforce/it/resources/salesforce/applications/Warehouse.app
+++ b/components/camel-salesforce/it/resources/salesforce/applications/Warehouse.app
@@ -18,10 +18,10 @@
<CustomApplication xmlns="http://soap.sforce.com/2006/04/metadata">
<defaultLandingTab>Merchandise__c</defaultLandingTab>
<label>Warehouse</label>
- <tab>standard-Chatter</tab>
- <tab>standard-File</tab>
- <tab>standard-report</tab>
- <tab>standard-Dashboard</tab>
- <tab>Invoice__c</tab>
- <tab>Merchandise__c</tab>
+ <tabs>standard-Chatter</tabs>
+ <tabs>standard-File</tabs>
+ <tabs>standard-report</tabs>
+ <tabs>standard-Dashboard</tabs>
+ <tabs>Invoice__c</tabs>
+ <tabs>Merchandise__c</tabs>
</CustomApplication>
diff --git a/components/camel-salesforce/it/resources/salesforce/connectedApps/.gitignore b/components/camel-salesforce/it/resources/salesforce/connectedApps/.gitignore
new file mode 100644
index 0000000..464cef4
--- /dev/null
+++ b/components/camel-salesforce/it/resources/salesforce/connectedApps/.gitignore
@@ -0,0 +1 @@
+CamelSalesforceIntegrationTests.connectedApp
diff --git a/components/camel-salesforce/it/resources/salesforce/connectedApps/CamelSalesforceIntegrationTests.connectedApp b/components/camel-salesforce/it/resources/salesforce/connectedApps/CamelSalesforceIntegrationTests.connectedApp
deleted file mode 100644
index 0ca3469..0000000
--- a/components/camel-salesforce/it/resources/salesforce/connectedApps/CamelSalesforceIntegrationTests.connectedApp
+++ /dev/null
@@ -1,29 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<ConnectedApp xmlns="http://soap.sforce.com/2006/04/metadata">
- <contactEmail>cbactrianus@gmail.com</contactEmail>
- <label>CamelSalesforceIntegrationTests</label>
- <oauthConfig>
- <callbackUrl>https://login.salesforce.com/services/oauth2/success</callbackUrl>
- <certificate>MIIC1TCCAb2gAwIBAgIEM3ZMGjANBgkqhkiG9w0BAQsFADAaMRgwFgYDVQQDEw9TYWxlc2ZvcmNlIHRlc3QwIBcNMTcwMzE0MjIxMjU0WhgPMjExNzAyMTgyMjEyNTRaMBoxGDAWBgNVBAMTD1NhbGVzZm9yY2UgdGVzdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMbZinVCZerzZvyyQlfE6nMpEQRfVsjpcfT01UTG/bwzWorP7YRpGkDW7Q4eu6IPrHtohkhM3JtsSVka5jfS1iEguMXLdNkEyMjMiBrWJeyGfcISF1yazgqLxxwcwGjMn3C9xV5tBxiRSqtMrV1iRx3fxmLue1UnZjSyUaG+Vi+FcKxqre5ixApeDCZHLONxBy3mjWK4GIeBBbUqQqy3LNrT6B34WdNX8vTslpTKOlmLyycEI/Rx+A4lNaultrJHdnRhGBr [...]
- <consumerKey>3MVG9szVa2RxsqBZXHfqsW3hf9HQp_N6qdSmpjKMzSJaEL4UP161JlDkE32EigL82ra_jM1WuQgF4rYDgzL3u</consumerKey>
- <consumerSecret>1039611643161946846</consumerSecret>
- <scopes>Api</scopes>
- <scopes>RefreshToken</scopes>
- </oauthConfig>
-</ConnectedApp>
diff --git a/components/camel-salesforce/it/resources/salesforce/package.xml b/components/camel-salesforce/it/resources/salesforce/package.xml
index 448f80f..bb974e6 100644
--- a/components/camel-salesforce/it/resources/salesforce/package.xml
+++ b/components/camel-salesforce/it/resources/salesforce/package.xml
@@ -87,5 +87,9 @@
<members>CamelSalesforceIntegrationTests</members>
<name>ConnectedApp</name>
</types>
- <version>41.0</version>
+ <types>
+ <members>ChangeEvents</members>
+ <name>PlatformEventChannel</name>
+ </types>
+ <version>45.0</version>
</Package>
diff --git a/components/camel-salesforce/it/resources/salesforce/applications/Warehouse.app b/components/camel-salesforce/it/resources/salesforce/platformEventChannels/ChangeEvents.platformEventChannel
similarity index 69%
copy from components/camel-salesforce/it/resources/salesforce/applications/Warehouse.app
copy to components/camel-salesforce/it/resources/salesforce/platformEventChannels/ChangeEvents.platformEventChannel
index bb76152..e309403 100644
--- a/components/camel-salesforce/it/resources/salesforce/applications/Warehouse.app
+++ b/components/camel-salesforce/it/resources/salesforce/platformEventChannels/ChangeEvents.platformEventChannel
@@ -15,13 +15,10 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<CustomApplication xmlns="http://soap.sforce.com/2006/04/metadata">
- <defaultLandingTab>Merchandise__c</defaultLandingTab>
- <label>Warehouse</label>
- <tab>standard-Chatter</tab>
- <tab>standard-File</tab>
- <tab>standard-report</tab>
- <tab>standard-Dashboard</tab>
- <tab>Invoice__c</tab>
- <tab>Merchandise__c</tab>
-</CustomApplication>
+<PlatformEventChannel xmlns="http://soap.sforce.com/2006/04/metadata">
+ <channelMembers>
+ <selectedEntity>AccountChangeEvent</selectedEntity>
+ </channelMembers>
+ <channelType>data</channelType>
+ <label>ChangeEvents</label>
+</PlatformEventChannel>