You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by je...@apache.org on 2021/11/15 02:18:16 UTC
[camel] 02/02: camel-salesforce: restore & fix integration test.
This is an automated email from the ASF dual-hosted git repository.
jeremyross pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 2fbf4b94c5f9db6bd66040ed49abd9262ea3788c
Author: Jeremy Ross <je...@gmail.com>
AuthorDate: Sun Nov 14 20:15:45 2021 -0600
camel-salesforce: restore & fix integration test.
---
.../SubscriptionHelperIntegrationTest.java | 109 ++++++++++++++++++---
1 file changed, 96 insertions(+), 13 deletions(-)
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java
index 5105595..8df70f1 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java
@@ -37,6 +37,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
import org.slf4j.LoggerFactory;
import static org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelperIntegrationTest.MessageArgumentMatcher.messageForAccountCreationWithName;
@@ -53,15 +54,10 @@ import static org.mockito.Mockito.when;
public class SubscriptionHelperIntegrationTest {
final CamelContext camel;
-
final SalesforceEndpointConfig config = new SalesforceEndpointConfig();
-
final BlockingQueue<String> messages = new LinkedBlockingDeque<>();
-
final SalesforceComponent salesforce;
-
final StubServer server;
-
final SubscriptionHelper subscription;
SalesforceConsumer toUnsubscribe;
@@ -77,33 +73,25 @@ public class SubscriptionHelperIntegrationTest {
@Override
public boolean matches(final Message message) {
final Map<String, Object> data = message.getDataAsMap();
-
@SuppressWarnings("unchecked")
final Map<String, Object> event = (Map<String, Object>) data.get("event");
-
@SuppressWarnings("unchecked")
final Map<String, Object> sobject = (Map<String, Object>) data.get("sobject");
-
return "created".equals(event.get("type")) && name.equals(sobject.get("Name"));
}
static Message messageForAccountCreationWithName(final String name) {
return argThat(new MessageArgumentMatcher(name));
}
-
}
public SubscriptionHelperIntegrationTest() throws SalesforceException {
server = new StubServer();
-
LoggerFactory.getLogger(SubscriptionHelperIntegrationTest.class).info("Port for wireshark to filter: {}",
server.port());
-
final String instanceUrl = "http://localhost:" + server.port();
-
server.replyTo("POST", "/services/oauth2/token",
"{\"instance_url\":\"" + instanceUrl + "\",\"access_token\":\"token\"}");
-
server.replyTo("GET", "/services/oauth2/revoke?token=token", 200);
server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/handshake", "[\n"
@@ -202,6 +190,101 @@ public class SubscriptionHelperIntegrationTest {
}
@Test
+ void shouldResubscribeOnConnectionFailures() throws InterruptedException {
+ // handshake and connect
+ subscription.start();
+
+ final SalesforceConsumer consumer
+ = toUnsubscribe = mock(SalesforceConsumer.class, "shouldResubscribeOnConnectionFailures:consumer");
+
+ final SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class, "shouldResubscribeOnConnectionFailures:endpoint");
+
+ // subscribe
+ when(consumer.getTopicName()).thenReturn("Account");
+
+ when(consumer.getEndpoint()).thenReturn(endpoint);
+ when(endpoint.getConfiguration()).thenReturn(config);
+ when(endpoint.getComponent()).thenReturn(salesforce);
+ when(endpoint.getTopicName()).thenReturn("Account");
+
+ subscription.subscribe("Account", consumer);
+
+ // push one message so we know connection is established and consumer
+ // receives notifications
+ messages.add("[\n"
+ + " {\n"
+ + " \"data\": {\n"
+ + " \"event\": {\n"
+ + " \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n"
+ + " \"replayId\": 1,\n"
+ + " \"type\": \"created\"\n"
+ + " },\n"
+ + " \"sobject\": {\n"
+ + " \"Id\": \"0011n00002XWMgVAAX\",\n"
+ + " \"Name\": \"shouldResubscribeOnConnectionFailures 1\"\n"
+ + " }\n"
+ + " },\n"
+ + " \"channel\": \"/topic/Account\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+ + " \"channel\": \"/meta/connect\",\n"
+ + " \"id\": \"$id\",\n"
+ + " \"successful\": true\n"
+ + " }\n"
+ + "]");
+
+ verify(consumer, Mockito.timeout(100)).processMessage(any(ClientSessionChannel.class),
+ messageForAccountCreationWithName("shouldResubscribeOnConnectionFailures 1"));
+
+ // send failed connection message w/o reconnect advice so we handshake again
+
+ messages.add("[\n" +
+ " {\n" +
+ " \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n" +
+ " \"channel\": \"/meta/connect\",\n" +
+ " \"id\": \"$id\",\n" +
+ " \"successful\": false,\n" +
+ " \"advice\": {\n" +
+ " \"reconnect\": \"none\"\n" +
+ " }\n" +
+ " }\n" +
+ "]");
+
+ // queue next message for when the client recovers
+ messages.add("[\n"
+ + " {\n"
+ + " \"data\": {\n"
+ + " \"event\": {\n"
+ + " \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n"
+ + " \"replayId\": 2,\n"
+ + " \"type\": \"created\"\n"
+ + " },\n"
+ + " \"sobject\": {\n"
+ + " \"Id\": \"0011n00002XWMgVAAX\",\n"
+ + " \"Name\": \"shouldResubscribeOnConnectionFailures 2\"\n"
+ + " }\n"
+ + " },\n"
+ + " \"channel\": \"/topic/Account\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+ + " \"channel\": \"/meta/connect\",\n"
+ + " \"id\": \"$id\",\n"
+ + " \"successful\": true\n"
+ + " }\n"
+ + "]");
+
+ // assert last message was received, recovery can take a bit
+ verify(consumer, timeout(10000)).processMessage(any(ClientSessionChannel.class),
+ messageForAccountCreationWithName("shouldResubscribeOnConnectionFailures 2"));
+
+ verify(consumer, atLeastOnce()).getEndpoint();
+ verify(consumer, atLeastOnce()).getTopicName();
+ verifyNoMoreInteractions(consumer);
+ }
+
+ @Test
void shouldResubscribeOnHelperRestart() {
// handshake and connect
subscription.start();