You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by zr...@apache.org on 2018/07/06 16:05:41 UTC
[camel] branch master updated: CAMEL-11803: Salesforce platform
events
This is an automated email from the ASF dual-hosted git repository.
zregvart pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 3908657 CAMEL-11803: Salesforce platform events
3908657 is described below
commit 3908657d1340fa31e7cecccb6507f3212abf7b36
Author: Zoran Regvart <zr...@apache.org>
AuthorDate: Fri Jul 6 18:05:08 2018 +0200
CAMEL-11803: Salesforce platform events
This adds support for consuming Salesforce platform events via the
CometD long polling mechanism used only for streaming PushTopic
previously.
For emitting platform events, preexisting `createSObject` operation can
be used.
---
.../camel-salesforce-component/pom.xml | 6 +
.../src/main/docs/salesforce-component.adoc | 56 +++++
.../component/salesforce/SalesforceConsumer.java | 235 ++++++++++--------
.../salesforce/api/dto/PlatformEvent.java | 83 +++++++
.../internal/streaming/SubscriptionHelper.java | 21 +-
.../PlatformEventsConsumerIntegrationTest.java | 69 ++++++
.../salesforce/SalesforceConsumerTest.java | 266 +++++++++++++++++++++
.../salesforce/api/dto/PlatformEventTest.java | 48 ++++
.../internal/streaming/SubscriptionHelperTest.java | 8 +
.../salesforce/objects/TestEvent__e.object | 35 +++
.../it/resources/salesforce/package.xml | 6 +-
11 files changed, 731 insertions(+), 102 deletions(-)
diff --git a/components/camel-salesforce/camel-salesforce-component/pom.xml b/components/camel-salesforce/camel-salesforce-component/pom.xml
index b4e41bd..f9892c0 100644
--- a/components/camel-salesforce/camel-salesforce-component/pom.xml
+++ b/components/camel-salesforce/camel-salesforce-component/pom.xml
@@ -204,6 +204,12 @@
<version>${okclient-version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <version>${awaitility-version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc b/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc
index a2276a9..9853e5c 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc
@@ -212,6 +212,62 @@ To subscribe to an existing topic
from("salesforce:CamelTestTopic&sObjectName=Merchandise__c")...
----
+==== Platform events
+
+To emit a platform event use `createSObject` operation. And set the
+message body can be JSON string or InputStream with key-value data --
+in that case `sObjectName` needs to be set to the API name of the
+event, or a class that extends from AbstractDTOBase with the
+appropriate class name for the event.
+
+For example using a DTO:
+
+[source,java]
+----
+class Order_Event__e extends AbstractDTOBase {
+ @JsonProperty("OrderNumber")
+ private String orderNumber;
+ // ... other properties and getters/setters
+}
+
+from("timer:tick")
+ .process(exchange -> {
+ final Message in = exchange.getIn();
+ String orderNumber = "ORD" + String.valueOf(in.getHeader(Exchange.TIMER_COUNTER));
+ Order_Event__e event = new Order_Event__e();
+ event.setOrderNumber(orderNumber);
+ in.setBody(event);
+ })
+ .to("salesforce:createSObject");
+----
+
+Or using JSON event data:
+
+[source,java]
+----
+from("timer:tick")
+ .process(exchange -> {
+ final Message in = exchange.getIn();
+ String orderNumber = "ORD" + String.valueOf(in.getHeader(Exchange.TIMER_COUNTER));
+ in.setBody("{\"OrderNumber\":\"" + orderNumber + "\"}");
+ })
+ .to("salesforce:createSObject?sObjectName=Order_Event__e");
+----
+
+To receive platform events use the consumer endpoint with the API name of
+the platform event prefixed with `event/` (or `/event/`), e.g.:
+`salesforce:events/Order_Event__e`. Processor consuming from that
+endpoint will receive either `org.apache.camel.component.salesforce.api.dto.PlatformEvent`
+object or `org.cometd.bayeux.Message` in the body depending on the
+`rawPayload` being `false` or `true` respectively.
+
+For example, in the simplest form to consume one event:
+
+[source,java]
+----
+PlatformEvent event = consumer.receiveBody("salesforce:event/Order_Event__e", PlatformEvent.class);
+----
+
=== Examples
==== Uploading a document to a ContentWorkspace
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
index 839ba8b..861edec 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
@@ -18,20 +18,20 @@ package org.apache.camel.component.salesforce;
import java.io.IOException;
import java.io.StringReader;
-import java.util.HashMap;
import java.util.Map;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.component.salesforce.api.dto.PlatformEvent;
import org.apache.camel.component.salesforce.api.utils.JsonUtils;
import org.apache.camel.component.salesforce.internal.client.RestClient;
import org.apache.camel.component.salesforce.internal.streaming.PushTopicHelper;
import org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper;
import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSessionChannel;
@@ -41,41 +41,57 @@ import org.cometd.bayeux.client.ClientSessionChannel;
*/
public class SalesforceConsumer extends DefaultConsumer {
+ private enum MessageKind {
+ PLATFORM_EVENT, PUSH_TOPIC;
+
+ public static MessageKind fromTopicName(final String topicName) {
+ if (topicName.startsWith("event/") || topicName.startsWith("/event/")) {
+ return MessageKind.PLATFORM_EVENT;
+ }
+
+ return PUSH_TOPIC;
+ }
+ }
- private static final ObjectMapper OBJECT_MAPPER = JsonUtils.createObjectMapper();
- private static final String EVENT_PROPERTY = "event";
- private static final String TYPE_PROPERTY = "type";
private static final String CREATED_DATE_PROPERTY = "createdDate";
- private static final String SOBJECT_PROPERTY = "sobject";
- private static final String REPLAY_ID_PROPERTY = "replayId";
+ private static final String EVENT_PROPERTY = "event";
private static final double MINIMUM_VERSION = 24.0;
+ private static final ObjectMapper OBJECT_MAPPER = JsonUtils.createObjectMapper();
+ private static final String REPLAY_ID_PROPERTY = "replayId";
+ private static final String SOBJECT_PROPERTY = "sobject";
+ private static final String TYPE_PROPERTY = "type";
private final SalesforceEndpoint endpoint;
- private final SubscriptionHelper subscriptionHelper;
+ private final MessageKind messageKind;
private final ObjectMapper objectMapper;
- private final String topicName;
+ private final boolean rawPayload;
private final Class<?> sObjectClass;
private boolean subscribed;
+ private final SubscriptionHelper subscriptionHelper;
+ private final String topicName;
-
- public SalesforceConsumer(SalesforceEndpoint endpoint, Processor processor, SubscriptionHelper helper) {
+ public SalesforceConsumer(final SalesforceEndpoint endpoint, final Processor processor, final SubscriptionHelper helper) {
super(endpoint, processor);
this.endpoint = endpoint;
- ObjectMapper configuredObjectMapper = endpoint.getConfiguration().getObjectMapper();
+ final ObjectMapper configuredObjectMapper = endpoint.getConfiguration().getObjectMapper();
if (configuredObjectMapper != null) {
- this.objectMapper = configuredObjectMapper;
+ objectMapper = configuredObjectMapper;
} else {
- this.objectMapper = OBJECT_MAPPER;
+ objectMapper = OBJECT_MAPPER;
}
// check minimum supported API version
- if (Double.valueOf(endpoint.getConfiguration().getApiVersion()) < MINIMUM_VERSION) {
+ if (Double.parseDouble(endpoint.getConfiguration().getApiVersion()) < MINIMUM_VERSION) {
throw new IllegalArgumentException("Minimum supported API version for consumer endpoints is " + 24.0);
}
- this.topicName = endpoint.getTopicName();
- this.subscriptionHelper = helper;
+ topicName = endpoint.getTopicName();
+ subscriptionHelper = helper;
+
+ messageKind = MessageKind.fromTopicName(topicName);
+
+ rawPayload = endpoint.getConfiguration().getRawPayload();
// get sObjectClass to convert to
final String sObjectName = endpoint.getConfiguration().getSObjectName();
@@ -99,66 +115,87 @@ public class SalesforceConsumer extends DefaultConsumer {
}
- @Override
- protected void doStart() throws Exception {
- super.doStart();
+ public String getTopicName() {
+ return topicName;
+ }
- final SalesforceEndpointConfig config = endpoint.getConfiguration();
+ @Override
+ public void handleException(String message, Throwable t) {
+ super.handleException(message, t);
+ }
- // is a query configured in the endpoint?
- if (config.getSObjectQuery() != null) {
- // Note that we don't lookup topic if the query is not specified
- // create REST client for PushTopic operations
- final SalesforceComponent salesforceComponent = endpoint.getComponent();
- final RestClient restClient = salesforceComponent.createRestClientFor(endpoint);
+ public void processMessage(final ClientSessionChannel channel, final Message message) {
+ if (log.isDebugEnabled()) {
+ log.debug("Received event {} on channel {}", channel.getId(), channel.getChannelId());
+ }
- // don't forget to start the client
- ServiceHelper.startService(restClient);
+ final Exchange exchange = endpoint.createExchange();
+ final org.apache.camel.Message in = exchange.getIn();
+
+ switch (messageKind) {
+ case PUSH_TOPIC:
+ createPushTopicMessage(message, in);
+ break;
+ case PLATFORM_EVENT:
+ createPlatformEventMessage(message, in);
+ break;
+ }
- try {
- PushTopicHelper helper = new PushTopicHelper(config, topicName, restClient);
- helper.createOrUpdateTopic();
- } finally {
- // don't forget to stop the client
- ServiceHelper.stopService(restClient);
+ try {
+ getAsyncProcessor().process(exchange);
+ } catch (final Exception e) {
+ final String msg = String.format("Error processing %s: %s", exchange, e);
+ handleException(msg, new SalesforceException(msg, e));
+ } finally {
+ final Exception ex = exchange.getException();
+ if (ex != null) {
+ final String msg = String.format("Unhandled exception: %s", ex.getMessage());
+ handleException(msg, new SalesforceException(msg, ex));
}
}
-
- // subscribe to topic
- subscriptionHelper.subscribe(topicName, this);
- subscribed = true;
}
- @Override
- protected void doStop() throws Exception {
- super.doStop();
+ void createPlatformEventMessage(final Message message, final org.apache.camel.Message in) {
+ setHeaders(in, message);
- if (subscribed) {
- subscribed = false;
- // unsubscribe from topic
- subscriptionHelper.unsubscribe(topicName, this);
+ final Map<String, Object> data = message.getDataAsMap();
+
+ @SuppressWarnings("unchecked")
+ final Map<String, Object> event = (Map<String, Object>) data.get("event");
+
+ final Object replayId = event.get(REPLAY_ID_PROPERTY);
+ if (replayId != null) {
+ in.setHeader("CamelSalesforceReplayId", replayId);
}
+
+ in.setHeader("CamelSalesforcePlatformEventSchema", data.get("schema"));
+ in.setHeader("CamelSalesforceEventType", topicName.substring(topicName.lastIndexOf('/') + 1));
+
+ final Object payload = data.get("payload");
+
+ final PlatformEvent platformEvent = objectMapper.convertValue(payload, PlatformEvent.class);
+ in.setHeader("CamelSalesforceCreatedDate", platformEvent.getCreated());
+
+ if (rawPayload) {
+ in.setBody(message);
+ } else {
+ in.setBody(platformEvent);
+ }
+
}
- public void processMessage(ClientSessionChannel channel, Message message) {
- final Exchange exchange = endpoint.createExchange();
- org.apache.camel.Message in = exchange.getIn();
+ void createPushTopicMessage(final Message message, final org.apache.camel.Message in) {
setHeaders(in, message);
- // get event data
- // TODO do we need to add NPE checks for message/data.get***???
- Map<String, Object> data = message.getDataAsMap();
+ final Map<String, Object> data = message.getDataAsMap();
@SuppressWarnings("unchecked")
final Map<String, Object> event = (Map<String, Object>) data.get(EVENT_PROPERTY);
final Object eventType = event.get(TYPE_PROPERTY);
- Object createdDate = event.get(CREATED_DATE_PROPERTY);
- Object replayId = event.get(REPLAY_ID_PROPERTY);
- if (log.isDebugEnabled()) {
- log.debug(String.format("Received event %s on channel %s created on %s",
- eventType, channel.getChannelId(), createdDate));
- }
+ final Object createdDate = event.get(CREATED_DATE_PROPERTY);
+ final Object replayId = event.get(REPLAY_ID_PROPERTY);
+ in.setHeader("CamelSalesforceTopicName", topicName);
in.setHeader("CamelSalesforceEventType", eventType);
in.setHeader("CamelSalesforceCreatedDate", createdDate);
if (replayId != null) {
@@ -173,7 +210,7 @@ public class SalesforceConsumer extends DefaultConsumer {
final String sObjectString = objectMapper.writeValueAsString(sObject);
log.debug("Received SObject: {}", sObjectString);
- if (endpoint.getConfiguration().getRawPayload()) {
+ if (rawPayload) {
// return sobject string as exchange body
in.setBody(sObjectString);
} else if (sObjectClass == null) {
@@ -181,55 +218,61 @@ public class SalesforceConsumer extends DefaultConsumer {
in.setBody(sObject);
} else {
// create the expected SObject
- in.setBody(objectMapper.readValue(
- new StringReader(sObjectString), sObjectClass));
+ in.setBody(objectMapper.readValue(new StringReader(sObjectString), sObjectClass));
}
- } catch (IOException e) {
- final String msg = String.format("Error parsing message [%s] from Topic %s: %s",
- message, topicName, e.getMessage());
+ } catch (final IOException e) {
+ final String msg = String.format("Error parsing message [%s] from Topic %s: %s", message, topicName, e.getMessage());
handleException(msg, new SalesforceException(msg, e));
}
+ }
- try {
- getAsyncProcessor().process(exchange, new AsyncCallback() {
- public void done(boolean doneSync) {
- // noop
- if (log.isTraceEnabled()) {
- log.trace("Done processing event: {} {}", eventType.toString(),
- doneSync ? "synchronously" : "asynchronously");
- }
- }
- });
- } catch (Exception e) {
- String msg = String.format("Error processing %s: %s", exchange, e);
- handleException(msg, new SalesforceException(msg, e));
- } finally {
- Exception ex = exchange.getException();
- if (ex != null) {
- String msg = String.format("Unhandled exception: %s", ex.getMessage());
- handleException(msg, new SalesforceException(msg, ex));
- }
+ void setHeaders(final org.apache.camel.Message in, final Message message) {
+ in.setHeader("CamelSalesforceChannel", message.getChannel());
+ final String clientId = message.getClientId();
+ if (ObjectHelper.isNotEmpty(clientId)) {
+ in.setHeader("CamelSalesforceClientId", clientId);
}
}
- private void setHeaders(org.apache.camel.Message in, Message message) {
- Map<String, Object> headers = new HashMap<>();
- // set topic name
- headers.put("CamelSalesforceTopicName", topicName);
- // set message properties as headers
- headers.put("CamelSalesforceChannel", message.getChannel());
- headers.put("CamelSalesforceClientId", message.getClientId());
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+
+ final SalesforceEndpointConfig config = endpoint.getConfiguration();
- in.setHeaders(headers);
+ // is a query configured in the endpoint?
+ if (messageKind == MessageKind.PUSH_TOPIC && ObjectHelper.isNotEmpty(config.getSObjectQuery())) {
+ // Note that we don't lookup topic if the query is not specified
+ // create REST client for PushTopic operations
+ final SalesforceComponent salesforceComponent = endpoint.getComponent();
+ final RestClient restClient = salesforceComponent.createRestClientFor(endpoint);
+
+ // don't forget to start the client
+ ServiceHelper.startService(restClient);
+
+ try {
+ final PushTopicHelper helper = new PushTopicHelper(config, topicName, restClient);
+ helper.createOrUpdateTopic();
+ } finally {
+ // don't forget to stop the client
+ ServiceHelper.stopService(restClient);
+ }
+ }
+
+ // subscribe to topic
+ subscriptionHelper.subscribe(topicName, this);
+ subscribed = true;
}
@Override
- public void handleException(String message, Throwable t) {
- super.handleException(message, t);
- }
+ protected void doStop() throws Exception {
+ super.doStop();
- public String getTopicName() {
- return topicName;
+ if (subscribed) {
+ subscribed = false;
+ // unsubscribe from topic
+ subscriptionHelper.unsubscribe(topicName, this);
+ }
}
}
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/PlatformEvent.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/PlatformEvent.java
new file mode 100644
index 0000000..842d4f7
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/PlatformEvent.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.api.dto;
+
+import java.io.Serializable;
+import java.time.ZonedDateTime;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import com.fasterxml.jackson.annotation.JsonAnySetter;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public final class PlatformEvent implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final ZonedDateTime created;
+
+ private final String createdById;
+
+ private final Map<String, String> eventData = new HashMap<>();
+
+ @JsonCreator
+ public PlatformEvent(@JsonProperty("CreatedDate") final ZonedDateTime created, @JsonProperty("CreatedById") final String createdById) {
+ this.created = created;
+ this.createdById = createdById;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof PlatformEvent)) {
+ return false;
+ }
+
+ final PlatformEvent other = (PlatformEvent) obj;
+
+ return Objects.equals(created, other.created) && Objects.equals(createdById, other.createdById) && Objects.equals(eventData, other.eventData);
+ }
+
+ public ZonedDateTime getCreated() {
+ return created;
+ }
+
+ public String getCreatedById() {
+ return createdById;
+ }
+
+ public Map<String, String> getEventData() {
+ return eventData;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(created, createdById, eventData);
+ }
+
+ @JsonAnySetter
+ public void set(final String name, final String value) {
+ eventData.put(name, value);
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder().append("PlatformEvent: createdById: ").append(createdById).append(", createdId: ").append(created).append(", data: ").append(eventData)
+ .toString();
+ }
+}
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index e3a84d1..c10aa92 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -459,8 +459,25 @@ public class SubscriptionHelper extends ServiceSupport {
.filter(Objects::nonNull).findFirst();
}
- static String getChannelName(String topicName) {
- return "/topic/" + topicName;
+ static String getChannelName(final String topicName) {
+ final StringBuilder channelName = new StringBuilder();
+ if (topicName.charAt(0) != '/') {
+ channelName.append('/');
+ }
+
+ if (topicName.indexOf('/', 1) > 0) {
+ channelName.append(topicName);
+ } else {
+ channelName.append("topic/");
+ channelName.append(topicName);
+ }
+
+ final int typeIdx = channelName.indexOf("/", 1);
+ if ("event".equals(channelName.substring(1, typeIdx)) && !topicName.endsWith("__e")) {
+ channelName.append("__e");
+ }
+
+ return channelName.toString();
}
public void unsubscribe(String topicName, SalesforceConsumer consumer) throws CamelException {
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PlatformEventsConsumerIntegrationTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PlatformEventsConsumerIntegrationTest.java
new file mode 100644
index 0000000..18d052b
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PlatformEventsConsumerIntegrationTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.salesforce.api.dto.PlatformEvent;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.entry;
+
+public class PlatformEventsConsumerIntegrationTest extends AbstractSalesforceTestBase {
+
+ @Test
+ public void shouldConsumePlatformEvents() throws InterruptedException, ExecutionException {
+ final ExecutorService parallel = Executors.newSingleThreadExecutor();
+
+ final Future<PlatformEvent> futurePlatformEvent = parallel.submit(() -> consumer.receiveBody("salesforce:event/TestEvent__e", PlatformEvent.class));
+
+ // it takes some time for the subscriber to subscribe, so we'll try to
+ // send repeated platform events and wait until the first one is
+ // received
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ template.sendBody("direct:sendPlatformEvent", "{\"Test_Field__c\": \"data\"}");
+
+ Assertions.assertThat(futurePlatformEvent.isDone()).isTrue();
+ });
+
+ final PlatformEvent platformEvent = futurePlatformEvent.get();
+ Assertions.assertThat(platformEvent).isNotNull();
+ Assertions.assertThat(platformEvent.getEventData()).containsOnly(entry("Test_Field__c", "data"));
+ }
+
+ @Override
+ protected RouteBuilder doCreateRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:sendPlatformEvent").to("salesforce:createSObject?sObjectName=TestEvent__e");
+ }
+ };
+ }
+
+ @Override
+ protected String salesforceApiVersionToUse() {
+ return "41.0";
+ }
+}
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/SalesforceConsumerTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/SalesforceConsumerTest.java
new file mode 100644
index 0000000..80db986
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/SalesforceConsumerTest.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce;
+
+import java.time.ZonedDateTime;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.salesforce.api.dto.PlatformEvent;
+import org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper;
+import org.apache.camel.spi.ClassResolver;
+import org.cometd.bayeux.Message;
+import org.cometd.bayeux.client.ClientSessionChannel;
+import org.cometd.common.HashMapMessage;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+public class SalesforceConsumerTest {
+
+ public static class AccountUpdates {
+
+ @JsonProperty("Id")
+ String id;
+
+ @JsonProperty("Name")
+ String name;
+
+ @JsonProperty("Phone")
+ String phone;
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof AccountUpdates)) {
+ return false;
+ }
+
+ final AccountUpdates other = (AccountUpdates) obj;
+
+ return Objects.equals(id, other.id) && Objects.equals(name, other.name) && Objects.equals(phone, other.phone);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, name, phone);
+ }
+
+ }
+
+ static final SubscriptionHelper NOT_USED = null;
+
+ SalesforceEndpointConfig configuration = new SalesforceEndpointConfig();
+
+ SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class);
+
+ Exchange exchange = mock(Exchange.class);
+
+ org.apache.camel.Message in = mock(org.apache.camel.Message.class);
+
+ AsyncProcessor processor = mock(AsyncProcessor.class);
+
+ Message pushTopicMessage;
+
+ @Before
+ public void setupMocks() {
+ when(endpoint.getConfiguration()).thenReturn(configuration);
+ when(endpoint.createExchange()).thenReturn(exchange);
+ when(exchange.getIn()).thenReturn(in);
+ final SalesforceComponent component = mock(SalesforceComponent.class);
+ when(endpoint.getComponent()).thenReturn(component);
+ final CamelContext camelContext = mock(CamelContext.class);
+ when(component.getCamelContext()).thenReturn(camelContext);
+ final ClassResolver classResolver = mock(ClassResolver.class);
+ when(camelContext.getClassResolver()).thenReturn(classResolver);
+ when(classResolver.resolveClass(AccountUpdates.class.getName())).thenReturn((Class) AccountUpdates.class);
+
+ pushTopicMessage = createPushTopicMessage();
+ }
+
+ @Test
+ public void shouldProcessMappedPayloadPushTopicMessages() throws Exception {
+ when(endpoint.getTopicName()).thenReturn("AccountUpdates");
+ configuration.setSObjectClass(AccountUpdates.class.getName());
+
+ final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED);
+
+ consumer.processMessage(mock(ClientSessionChannel.class), pushTopicMessage);
+
+ final AccountUpdates accountUpdates = new AccountUpdates();
+ accountUpdates.phone = "(415) 555-1212";
+ accountUpdates.id = "001D000000KneakIAB";
+ accountUpdates.name = "Blackbeard";
+
+ verify(in).setBody(accountUpdates);
+ verify(in).setHeader("CamelSalesforceEventType", "created");
+ verify(in).setHeader("CamelSalesforceCreatedDate", "2016-09-16T19:45:27.454Z");
+ verify(in).setHeader("CamelSalesforceReplayId", 1L);
+ verify(in).setHeader("CamelSalesforceTopicName", "AccountUpdates");
+ verify(in).setHeader("CamelSalesforceChannel", "/topic/AccountUpdates");
+ verify(in).setHeader("CamelSalesforceClientId", "lxdl9o32njygi1gj47kgfaga4k");
+
+ verify(processor).process(same(exchange));
+ }
+
+ @Test
+ public void shouldProcessPlatformEvents() throws Exception {
+ when(endpoint.getTopicName()).thenReturn("/event/TestEvent__e");
+
+ final Message message = new HashMapMessage();
+ final Map<String, Object> data = new HashMap<>();
+ data.put("schema", "30H2pgzuWcF844p26Ityvg");
+
+ final Map<String, Object> payload = new HashMap<>();
+ payload.put("Test_Field__c", "abc");
+ payload.put("CreatedById", "00541000002WYFpAAO");
+ payload.put("CreatedDate", "2018-07-06T12:41:04Z");
+ data.put("payload", payload);
+ data.put("event", Collections.singletonMap("replayId", 4L));
+ message.put("data", data);
+ message.put("channel", "/event/TestEvent__e");
+
+ final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED);
+
+ consumer.processMessage(mock(ClientSessionChannel.class), message);
+
+ final ZonedDateTime created = ZonedDateTime.parse("2018-07-06T12:41:04Z");
+ final PlatformEvent event = new PlatformEvent(created, "00541000002WYFpAAO");
+ event.set("Test_Field__c", "abc");
+ verify(in).setBody(event);
+ verify(in).setHeader("CamelSalesforceCreatedDate", created);
+ verify(in).setHeader("CamelSalesforceReplayId", 4L);
+ verify(in).setHeader("CamelSalesforceChannel", "/event/TestEvent__e");
+ verify(in).setHeader("CamelSalesforceEventType", "TestEvent__e");
+ verify(in).setHeader("CamelSalesforcePlatformEventSchema", "30H2pgzuWcF844p26Ityvg");
+
+ verify(processor).process(same(exchange));
+
+ verifyNoMoreInteractions(in, processor);
+ }
+
+ @Test
+ public void shouldProcessPushTopicMessages() throws Exception {
+ when(endpoint.getTopicName()).thenReturn("AccountUpdates");
+
+ final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED);
+
+ consumer.processMessage(mock(ClientSessionChannel.class), pushTopicMessage);
+
+ @SuppressWarnings("unchecked")
+ final Object sobject = ((Map<String, Object>) pushTopicMessage.get("data")).get("sobject");
+ verify(in).setBody(sobject);
+ verify(in).setHeader("CamelSalesforceEventType", "created");
+ verify(in).setHeader("CamelSalesforceCreatedDate", "2016-09-16T19:45:27.454Z");
+ verify(in).setHeader("CamelSalesforceReplayId", 1L);
+ verify(in).setHeader("CamelSalesforceTopicName", "AccountUpdates");
+ verify(in).setHeader("CamelSalesforceChannel", "/topic/AccountUpdates");
+ verify(in).setHeader("CamelSalesforceClientId", "lxdl9o32njygi1gj47kgfaga4k");
+
+ verify(processor).process(same(exchange));
+ }
+
+ @Test
+ public void shouldProcessRawPayloadPushTopicMessages() throws Exception {
+ when(endpoint.getTopicName()).thenReturn("AccountUpdates");
+ configuration.setRawPayload(true);
+
+ final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED);
+
+ consumer.processMessage(mock(ClientSessionChannel.class), pushTopicMessage);
+
+ verify(in).setBody("{\"Phone\":\"(415) 555-1212\",\"Id\":\"001D000000KneakIAB\",\"Name\":\"Blackbeard\"}");
+ verify(in).setHeader("CamelSalesforceEventType", "created");
+ verify(in).setHeader("CamelSalesforceCreatedDate", "2016-09-16T19:45:27.454Z");
+ verify(in).setHeader("CamelSalesforceReplayId", 1L);
+ verify(in).setHeader("CamelSalesforceTopicName", "AccountUpdates");
+ verify(in).setHeader("CamelSalesforceChannel", "/topic/AccountUpdates");
+ verify(in).setHeader("CamelSalesforceClientId", "lxdl9o32njygi1gj47kgfaga4k");
+
+ verify(processor).process(same(exchange));
+ }
+
+ @Test
+ public void shouldProcessRawPlatformEvents() throws Exception {
+ when(endpoint.getTopicName()).thenReturn("/event/TestEvent__e");
+ configuration.setRawPayload(true);
+
+ final Message message = new HashMapMessage();
+ final Map<String, Object> data = new HashMap<>();
+ data.put("schema", "30H2pgzuWcF844p26Ityvg");
+
+ final Map<String, Object> payload = new HashMap<>();
+ payload.put("Test_Field__c", "abc");
+ payload.put("CreatedById", "00541000002WYFpAAO");
+ payload.put("CreatedDate", "2018-07-06T12:41:04Z");
+ data.put("payload", payload);
+ data.put("event", Collections.singletonMap("replayId", 4L));
+ message.put("data", data);
+ message.put("channel", "/event/TestEvent__e");
+
+ final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED);
+
+ consumer.processMessage(mock(ClientSessionChannel.class), message);
+
+ verify(in).setBody(message);
+ verify(in).setHeader("CamelSalesforceCreatedDate", ZonedDateTime.parse("2018-07-06T12:41:04Z"));
+ verify(in).setHeader("CamelSalesforceReplayId", 4L);
+ verify(in).setHeader("CamelSalesforceChannel", "/event/TestEvent__e");
+ verify(in).setHeader("CamelSalesforceEventType", "TestEvent__e");
+ verify(in).setHeader("CamelSalesforcePlatformEventSchema", "30H2pgzuWcF844p26Ityvg");
+
+ verify(processor).process(same(exchange));
+
+ verifyNoMoreInteractions(in, processor);
+ }
+
+ static Message createPushTopicMessage() {
+ final Message pushTopicMessage = new HashMapMessage();
+ pushTopicMessage.put("clientId", "lxdl9o32njygi1gj47kgfaga4k");
+
+ final Map<String, Object> data = new HashMap<>();
+ pushTopicMessage.put("data", data);
+
+ final Map<String, Object> event = new HashMap<>();
+ data.put("event", event);
+
+ event.put("createdDate", "2016-09-16T19:45:27.454Z");
+ event.put("replayId", 1L);
+ event.put("type", "created");
+
+ final Map<String, Object> sobject = new HashMap<>();
+ data.put("sobject", sobject);
+
+ sobject.put("Phone", "(415) 555-1212");
+ sobject.put("Id", "001D000000KneakIAB");
+ sobject.put("Name", "Blackbeard");
+
+ pushTopicMessage.put("channel", "/topic/AccountUpdates");
+ return pushTopicMessage;
+ }
+}
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/api/dto/PlatformEventTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/api/dto/PlatformEventTest.java
new file mode 100644
index 0000000..094b28d
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/api/dto/PlatformEventTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.api.dto;
+
+import java.io.IOException;
+import java.time.ZonedDateTime;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.camel.component.salesforce.api.utils.JsonUtils;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.entry;
+
+public class PlatformEventTest {
+
+ @Test
+ public void shouldDeserialize() throws IOException {
+ final ObjectMapper mapper = JsonUtils.createObjectMapper();
+
+ final PlatformEvent platformEvent = mapper.readValue("{\n" + //
+ " \"CreatedDate\": \"2017-04-14T13:35:23Z\", \n" + //
+ " \"CreatedById\": \"005B00000031mqb\", \n" + //
+ " \"Order_Number__c\": \"10013\", \n" + //
+ " \"Type__c\": \"Placed\"\n" + //
+ "}", PlatformEvent.class);
+
+ assertThat(platformEvent.getCreated()).isEqualTo(ZonedDateTime.parse("2017-04-14T13:35:23Z"));
+ assertThat(platformEvent.getCreatedById()).isEqualTo("005B00000031mqb");
+ assertThat(platformEvent.getEventData()).containsOnly(entry("Order_Number__c", "10013"),
+ entry("Type__c", "Placed"));
+ }
+}
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
index dc5e454..56b4a8b 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
@@ -27,6 +27,7 @@ import org.apache.camel.component.salesforce.SalesforceEndpointConfig;
import org.junit.Test;
import static org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.determineReplayIdFor;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -104,4 +105,11 @@ public class SubscriptionHelperTest {
assertEquals("Expecting replayId for `my-topic-3` to be 6, as it is endpoint configured explicitly on the endpoint",
Optional.of(6L), determineReplayIdFor(endpoint, "my-topic-3"));
}
+
+ @Test
+ public void shouldDetermineChannelNames() {
+ assertThat(SubscriptionHelper.getChannelName("topic1")).isEqualTo("/topic/topic1");
+ assertThat(SubscriptionHelper.getChannelName("event/Test")).isEqualTo("/event/Test__e");
+ assertThat(SubscriptionHelper.getChannelName("event/Test__e")).isEqualTo("/event/Test__e");
+ }
}
diff --git a/components/camel-salesforce/it/resources/salesforce/objects/TestEvent__e.object b/components/camel-salesforce/it/resources/salesforce/objects/TestEvent__e.object
new file mode 100644
index 0000000..bab35f9
--- /dev/null
+++ b/components/camel-salesforce/it/resources/salesforce/objects/TestEvent__e.object
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<CustomObject xmlns="http://soap.sforce.com/2006/04/metadata">
+ <deploymentStatus>Deployed</deploymentStatus>
+ <eventType>StandardVolume</eventType>
+ <fields>
+ <fullName>Test_Field__c</fullName>
+ <externalId>false</externalId>
+ <isFilteringDisabled>false</isFilteringDisabled>
+ <isNameField>false</isNameField>
+ <isSortingDisabled>false</isSortingDisabled>
+ <label>Test Field</label>
+ <length>20</length>
+ <required>false</required>
+ <type>Text</type>
+ <unique>false</unique>
+ </fields>
+ <label>Test Event</label>
+ <pluralLabel>Test Event</pluralLabel>
+</CustomObject>
\ No newline at end of file
diff --git a/components/camel-salesforce/it/resources/salesforce/package.xml b/components/camel-salesforce/it/resources/salesforce/package.xml
index eac3328..448f80f 100644
--- a/components/camel-salesforce/it/resources/salesforce/package.xml
+++ b/components/camel-salesforce/it/resources/salesforce/package.xml
@@ -42,9 +42,7 @@
<name>CustomField</name>
</types>
<types>
- <members>Invoice__c</members>
- <members>Line_Item__c</members>
- <members>Merchandise__c</members>
+ <members>*</members>
<name>CustomObject</name>
</types>
<types>
@@ -89,5 +87,5 @@
<members>CamelSalesforceIntegrationTests</members>
<name>ConnectedApp</name>
</types>
- <version>38.0</version>
+ <version>41.0</version>
</Package>