You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by je...@apache.org on 2021/11/15 02:18:16 UTC

[camel] 02/02: camel-salesforce: restore & fix integration test.

This is an automated email from the ASF dual-hosted git repository.

jeremyross pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 2fbf4b94c5f9db6bd66040ed49abd9262ea3788c
Author: Jeremy Ross <je...@gmail.com>
AuthorDate: Sun Nov 14 20:15:45 2021 -0600

    camel-salesforce: restore & fix integration test.
---
 .../SubscriptionHelperIntegrationTest.java         | 109 ++++++++++++++++++---
 1 file changed, 96 insertions(+), 13 deletions(-)

diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java
index 5105595..8df70f1 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java
@@ -37,6 +37,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.TestInstance.Lifecycle;
 import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelperIntegrationTest.MessageArgumentMatcher.messageForAccountCreationWithName;
@@ -53,15 +54,10 @@ import static org.mockito.Mockito.when;
 public class SubscriptionHelperIntegrationTest {
 
     final CamelContext camel;
-
     final SalesforceEndpointConfig config = new SalesforceEndpointConfig();
-
     final BlockingQueue<String> messages = new LinkedBlockingDeque<>();
-
     final SalesforceComponent salesforce;
-
     final StubServer server;
-
     final SubscriptionHelper subscription;
 
     SalesforceConsumer toUnsubscribe;
@@ -77,33 +73,25 @@ public class SubscriptionHelperIntegrationTest {
         @Override
         public boolean matches(final Message message) {
             final Map<String, Object> data = message.getDataAsMap();
-
             @SuppressWarnings("unchecked")
             final Map<String, Object> event = (Map<String, Object>) data.get("event");
-
             @SuppressWarnings("unchecked")
             final Map<String, Object> sobject = (Map<String, Object>) data.get("sobject");
-
             return "created".equals(event.get("type")) && name.equals(sobject.get("Name"));
         }
 
         static Message messageForAccountCreationWithName(final String name) {
             return argThat(new MessageArgumentMatcher(name));
         }
-
     }
 
     public SubscriptionHelperIntegrationTest() throws SalesforceException {
         server = new StubServer();
-
         LoggerFactory.getLogger(SubscriptionHelperIntegrationTest.class).info("Port for wireshark to filter: {}",
                 server.port());
-
         final String instanceUrl = "http://localhost:" + server.port();
-
         server.replyTo("POST", "/services/oauth2/token",
                 "{\"instance_url\":\"" + instanceUrl + "\",\"access_token\":\"token\"}");
-
         server.replyTo("GET", "/services/oauth2/revoke?token=token", 200);
 
         server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/handshake", "[\n"
@@ -202,6 +190,101 @@ public class SubscriptionHelperIntegrationTest {
     }
 
     @Test
+    void shouldResubscribeOnConnectionFailures() throws InterruptedException {
+        // handshake and connect
+        subscription.start();
+
+        final SalesforceConsumer consumer
+                = toUnsubscribe = mock(SalesforceConsumer.class, "shouldResubscribeOnConnectionFailures:consumer");
+
+        final SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class, "shouldResubscribeOnConnectionFailures:endpoint");
+
+        // subscribe
+        when(consumer.getTopicName()).thenReturn("Account");
+
+        when(consumer.getEndpoint()).thenReturn(endpoint);
+        when(endpoint.getConfiguration()).thenReturn(config);
+        when(endpoint.getComponent()).thenReturn(salesforce);
+        when(endpoint.getTopicName()).thenReturn("Account");
+
+        subscription.subscribe("Account", consumer);
+
+        // push one message so we know connection is established and consumer
+        // receives notifications
+        messages.add("[\n"
+                     + "  {\n"
+                     + "    \"data\": {\n"
+                     + "      \"event\": {\n"
+                     + "        \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n"
+                     + "        \"replayId\": 1,\n"
+                     + "        \"type\": \"created\"\n"
+                     + "      },\n"
+                     + "      \"sobject\": {\n"
+                     + "        \"Id\": \"0011n00002XWMgVAAX\",\n"
+                     + "        \"Name\": \"shouldResubscribeOnConnectionFailures 1\"\n"
+                     + "      }\n"
+                     + "    },\n"
+                     + "    \"channel\": \"/topic/Account\"\n"
+                     + "  },\n"
+                     + "  {\n"
+                     + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+                     + "    \"channel\": \"/meta/connect\",\n"
+                     + "    \"id\": \"$id\",\n"
+                     + "    \"successful\": true\n"
+                     + "  }\n"
+                     + "]");
+
+        verify(consumer, Mockito.timeout(100)).processMessage(any(ClientSessionChannel.class),
+                messageForAccountCreationWithName("shouldResubscribeOnConnectionFailures 1"));
+
+        // send failed connection message w/o reconnect advice so we handshake again
+
+        messages.add("[\n" +
+                     "  {\n" +
+                     "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n" +
+                     "    \"channel\": \"/meta/connect\",\n" +
+                     "    \"id\": \"$id\",\n" +
+                     "    \"successful\": false,\n" +
+                     "    \"advice\": {\n" +
+                     "       \"reconnect\": \"none\"\n" +
+                     "    }\n" +
+                     "  }\n" +
+                     "]");
+
+        // queue next message for when the client recovers
+        messages.add("[\n"
+                     + "  {\n"
+                     + "    \"data\": {\n"
+                     + "      \"event\": {\n"
+                     + "        \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n"
+                     + "        \"replayId\": 2,\n"
+                     + "        \"type\": \"created\"\n"
+                     + "      },\n"
+                     + "      \"sobject\": {\n"
+                     + "        \"Id\": \"0011n00002XWMgVAAX\",\n"
+                     + "        \"Name\": \"shouldResubscribeOnConnectionFailures 2\"\n"
+                     + "      }\n"
+                     + "    },\n"
+                     + "    \"channel\": \"/topic/Account\"\n"
+                     + "  },\n"
+                     + "  {\n"
+                     + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+                     + "    \"channel\": \"/meta/connect\",\n"
+                     + "    \"id\": \"$id\",\n"
+                     + "    \"successful\": true\n"
+                     + "  }\n"
+                     + "]");
+
+        // assert last message was received, recovery can take a bit
+        verify(consumer, timeout(10000)).processMessage(any(ClientSessionChannel.class),
+                messageForAccountCreationWithName("shouldResubscribeOnConnectionFailures 2"));
+
+        verify(consumer, atLeastOnce()).getEndpoint();
+        verify(consumer, atLeastOnce()).getTopicName();
+        verifyNoMoreInteractions(consumer);
+    }
+
+    @Test
     void shouldResubscribeOnHelperRestart() {
         // handshake and connect
         subscription.start();