You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by je...@apache.org on 2021/11/15 02:18:14 UTC

[camel] branch main updated (b44527d -> 2fbf4b9)

This is an automated email from the ASF dual-hosted git repository.

jeremyross pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from b44527d  fix doc error
     new 706e77a  camel-salesforce: Don't login if shutting down
     new 2fbf4b9  camel-salesforce: restore & fix integration test.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../internal/streaming/SubscriptionHelper.java     |   3 +
 .../SubscriptionHelperIntegrationTest.java         | 109 ++++++++++++++++++---
 2 files changed, 99 insertions(+), 13 deletions(-)

[camel] 02/02: camel-salesforce: restore & fix integration test.

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jeremyross pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 2fbf4b94c5f9db6bd66040ed49abd9262ea3788c
Author: Jeremy Ross <je...@gmail.com>
AuthorDate: Sun Nov 14 20:15:45 2021 -0600

    camel-salesforce: restore & fix integration test.
---
 .../SubscriptionHelperIntegrationTest.java         | 109 ++++++++++++++++++---
 1 file changed, 96 insertions(+), 13 deletions(-)

diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java
index 5105595..8df70f1 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java
@@ -37,6 +37,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.TestInstance.Lifecycle;
 import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelperIntegrationTest.MessageArgumentMatcher.messageForAccountCreationWithName;
@@ -53,15 +54,10 @@ import static org.mockito.Mockito.when;
 public class SubscriptionHelperIntegrationTest {
 
     final CamelContext camel;
-
     final SalesforceEndpointConfig config = new SalesforceEndpointConfig();
-
     final BlockingQueue<String> messages = new LinkedBlockingDeque<>();
-
     final SalesforceComponent salesforce;
-
     final StubServer server;
-
     final SubscriptionHelper subscription;
 
     SalesforceConsumer toUnsubscribe;
@@ -77,33 +73,25 @@ public class SubscriptionHelperIntegrationTest {
         @Override
         public boolean matches(final Message message) {
             final Map<String, Object> data = message.getDataAsMap();
-
             @SuppressWarnings("unchecked")
             final Map<String, Object> event = (Map<String, Object>) data.get("event");
-
             @SuppressWarnings("unchecked")
             final Map<String, Object> sobject = (Map<String, Object>) data.get("sobject");
-
             return "created".equals(event.get("type")) && name.equals(sobject.get("Name"));
         }
 
         static Message messageForAccountCreationWithName(final String name) {
             return argThat(new MessageArgumentMatcher(name));
         }
-
     }
 
     public SubscriptionHelperIntegrationTest() throws SalesforceException {
         server = new StubServer();
-
         LoggerFactory.getLogger(SubscriptionHelperIntegrationTest.class).info("Port for wireshark to filter: {}",
                 server.port());
-
         final String instanceUrl = "http://localhost:" + server.port();
-
         server.replyTo("POST", "/services/oauth2/token",
                 "{\"instance_url\":\"" + instanceUrl + "\",\"access_token\":\"token\"}");
-
         server.replyTo("GET", "/services/oauth2/revoke?token=token", 200);
 
         server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/handshake", "[\n"
@@ -202,6 +190,101 @@ public class SubscriptionHelperIntegrationTest {
     }
 
     @Test
+    void shouldResubscribeOnConnectionFailures() throws InterruptedException {
+        // handshake and connect
+        subscription.start();
+
+        final SalesforceConsumer consumer
+                = toUnsubscribe = mock(SalesforceConsumer.class, "shouldResubscribeOnConnectionFailures:consumer");
+
+        final SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class, "shouldResubscribeOnConnectionFailures:endpoint");
+
+        // subscribe
+        when(consumer.getTopicName()).thenReturn("Account");
+
+        when(consumer.getEndpoint()).thenReturn(endpoint);
+        when(endpoint.getConfiguration()).thenReturn(config);
+        when(endpoint.getComponent()).thenReturn(salesforce);
+        when(endpoint.getTopicName()).thenReturn("Account");
+
+        subscription.subscribe("Account", consumer);
+
+        // push one message so we know connection is established and consumer
+        // receives notifications
+        messages.add("[\n"
+                     + "  {\n"
+                     + "    \"data\": {\n"
+                     + "      \"event\": {\n"
+                     + "        \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n"
+                     + "        \"replayId\": 1,\n"
+                     + "        \"type\": \"created\"\n"
+                     + "      },\n"
+                     + "      \"sobject\": {\n"
+                     + "        \"Id\": \"0011n00002XWMgVAAX\",\n"
+                     + "        \"Name\": \"shouldResubscribeOnConnectionFailures 1\"\n"
+                     + "      }\n"
+                     + "    },\n"
+                     + "    \"channel\": \"/topic/Account\"\n"
+                     + "  },\n"
+                     + "  {\n"
+                     + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+                     + "    \"channel\": \"/meta/connect\",\n"
+                     + "    \"id\": \"$id\",\n"
+                     + "    \"successful\": true\n"
+                     + "  }\n"
+                     + "]");
+
+        verify(consumer, Mockito.timeout(100)).processMessage(any(ClientSessionChannel.class),
+                messageForAccountCreationWithName("shouldResubscribeOnConnectionFailures 1"));
+
+        // send failed connection message w/o reconnect advice so we handshake again
+
+        messages.add("[\n" +
+                     "  {\n" +
+                     "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n" +
+                     "    \"channel\": \"/meta/connect\",\n" +
+                     "    \"id\": \"$id\",\n" +
+                     "    \"successful\": false,\n" +
+                     "    \"advice\": {\n" +
+                     "       \"reconnect\": \"none\"\n" +
+                     "    }\n" +
+                     "  }\n" +
+                     "]");
+
+        // queue next message for when the client recovers
+        messages.add("[\n"
+                     + "  {\n"
+                     + "    \"data\": {\n"
+                     + "      \"event\": {\n"
+                     + "        \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n"
+                     + "        \"replayId\": 2,\n"
+                     + "        \"type\": \"created\"\n"
+                     + "      },\n"
+                     + "      \"sobject\": {\n"
+                     + "        \"Id\": \"0011n00002XWMgVAAX\",\n"
+                     + "        \"Name\": \"shouldResubscribeOnConnectionFailures 2\"\n"
+                     + "      }\n"
+                     + "    },\n"
+                     + "    \"channel\": \"/topic/Account\"\n"
+                     + "  },\n"
+                     + "  {\n"
+                     + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+                     + "    \"channel\": \"/meta/connect\",\n"
+                     + "    \"id\": \"$id\",\n"
+                     + "    \"successful\": true\n"
+                     + "  }\n"
+                     + "]");
+
+        // assert last message was received, recovery can take a bit
+        verify(consumer, timeout(10000)).processMessage(any(ClientSessionChannel.class),
+                messageForAccountCreationWithName("shouldResubscribeOnConnectionFailures 2"));
+
+        verify(consumer, atLeastOnce()).getEndpoint();
+        verify(consumer, atLeastOnce()).getTopicName();
+        verifyNoMoreInteractions(consumer);
+    }
+
+    @Test
     void shouldResubscribeOnHelperRestart() {
         // handshake and connect
         subscription.start();

[camel] 01/02: camel-salesforce: Don't login if shutting down

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jeremyross pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 706e77a49c57c3f5fda7de30c5a6ba0b636ff895
Author: Jeremy Ross <je...@gmail.com>
AuthorDate: Sun Nov 14 20:10:23 2021 -0600

    camel-salesforce: Don't login if shutting down
---
 .../component/salesforce/internal/streaming/SubscriptionHelper.java    | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index d730a66..c3831ba 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -543,6 +543,9 @@ public class SubscriptionHelper extends ServiceSupport {
         try {
             for (;;) {
                 try {
+                    if (isStoppingOrStopped()) {
+                        return;
+                    }
                     session.login(session.getAccessToken());
                     break;
                 } catch (SalesforceException e) {