You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by zr...@apache.org on 2020/12/16 14:53:44 UTC

[camel] branch issue/CAMEL-12871 updated (4e4a063 -> 341e504)

This is an automated email from the ASF dual-hosted git repository.

zregvart pushed a change to branch issue/CAMEL-12871
in repository https://gitbox.apache.org/repos/asf/camel.git.


    omit 4e4a063  CAMEL-12871: disconnect on handshake failure
    omit d22c3a4  CAMEL-12871: release resources on stop
    omit f1835b2  CAMEL-12871: stub server and test
     new a7fb4ec  CAMEL-12871: stub server and test
     new ca55b79  CAMEL-12871: release resources on stop
     new 341e504  CAMEL-12871: disconnect on handshake failure

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (4e4a063)
            \
             N -- N -- N   refs/heads/issue/CAMEL-12871 (341e504)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../internal/streaming/SubscriptionHelperIntegrationTest.java           | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


[camel] 03/03: CAMEL-12871: disconnect on handshake failure

Posted by zr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zregvart pushed a commit to branch issue/CAMEL-12871
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 341e504f83b2c110dde7ec1c185a0502a1b8ed47
Author: Zoran Regvart <zr...@apache.org>
AuthorDate: Wed Dec 16 14:43:13 2020 +0100

    CAMEL-12871: disconnect on handshake failure
    
    If we can't connect and perform the handshake, disconnecting will
    trigger client restart with back-off. Also when restarting as the signal
    to restart can occur on multiple threads we need to guard against
    restart happening in parallel.
---
 .../internal/streaming/SubscriptionHelper.java     | 132 ++++++++++++---------
 1 file changed, 77 insertions(+), 55 deletions(-)

diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index 25e363b..c97fb02 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -22,6 +22,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
 
@@ -93,6 +94,7 @@ public class SubscriptionHelper extends ServiceSupport {
 
     private volatile boolean reconnecting;
     private final AtomicLong restartBackoff;
+    private final AtomicBoolean restarting = new AtomicBoolean();
 
     public SubscriptionHelper(final SalesforceComponent component) throws SalesforceException {
         this.component = component;
@@ -173,6 +175,7 @@ public class SubscriptionHelper extends ServiceSupport {
                         connectError = (String) message.get(ERROR_FIELD);
                         connectException = getFailure(message);
 
+                        client.disconnect();
                     } else if (reconnecting) {
 
                         reconnecting = false;
@@ -206,6 +209,10 @@ public class SubscriptionHelper extends ServiceSupport {
         }
         client.getChannel(META_DISCONNECT).addListener(disconnectListener);
 
+        connect();
+    }
+
+    private void connect() throws CamelException {
         // connect to Salesforce cometd endpoint
         client.handshake();
 
@@ -229,80 +236,95 @@ public class SubscriptionHelper extends ServiceSupport {
 
     // launch an async task to restart
     private void restartClient() {
+        if (!restarting.compareAndSet(false, true)) {
+            return;
+        }
 
         // launch a new restart command
         final SalesforceHttpClient httpClient = component.getConfig().getHttpClient();
         httpClient.getExecutor().execute(new Runnable() {
             @Override
             public void run() {
+                try {
+                    performClientRestart();
+                } finally {
+                    restarting.set(false);
+                }
+            }
+        });
+    }
 
-                LOG.info("Restarting on unexpected disconnect from Salesforce...");
-                boolean abort = false;
+    private void performClientRestart() {
+        if (isStoppingOrStopped()) {
+            return;
+        }
 
-                // wait for disconnect
-                LOG.debug("Waiting to disconnect...");
-                while (!client.isDisconnected()) {
-                    try {
-                        Thread.sleep(DISCONNECT_INTERVAL);
-                    } catch (InterruptedException e) {
-                        LOG.error("Aborting restart on interrupt!");
-                        abort = true;
-                    }
-                }
+        LOG.info("Restarting on unexpected disconnect from Salesforce...");
+        boolean abort = false;
+
+        // wait for disconnect
+        LOG.debug("Waiting to disconnect...");
+        while (!abort && !client.isDisconnected()) {
+            try {
+                Thread.sleep(DISCONNECT_INTERVAL);
+            } catch (InterruptedException e) {
+                LOG.error("Aborting restart on interrupt!");
+                abort = true;
+            }
 
-                if (!abort) {
+            abort = isStoppingOrStopped();
+        }
 
-                    // update restart attempt backoff
-                    final long backoff = restartBackoff.getAndAdd(backoffIncrement);
-                    if (backoff > maxBackoff) {
-                        LOG.error("Restart aborted after exceeding {} msecs backoff", maxBackoff);
-                        abort = true;
-                    } else {
+        if (!abort) {
 
-                        // pause before restart attempt
-                        LOG.debug("Pausing for {} msecs before restart attempt", backoff);
-                        try {
-                            Thread.sleep(backoff);
-                        } catch (InterruptedException e) {
-                            LOG.error("Aborting restart on interrupt!");
-                            abort = true;
-                        }
-                    }
+            // update restart attempt backoff
+            final long backoff = restartBackoff.getAndAdd(backoffIncrement);
+            if (backoff > maxBackoff) {
+                LOG.error("Restart aborted after exceeding {} msecs backoff", maxBackoff);
+                abort = true;
+            } else {
 
-                    if (!abort) {
-                        Exception lastError = new SalesforceException("Unknown error", null);
-                        try {
-                            // reset client
-                            doStop();
+                // pause before restart attempt
+                LOG.debug("Pausing for {} msecs before restart attempt", backoff);
+                try {
+                    Thread.sleep(backoff);
+                } catch (InterruptedException e) {
+                    LOG.error("Aborting restart on interrupt!");
+                    abort = true;
+                }
+            }
 
-                            // register listeners and restart
-                            doStart();
+            if (!abort) {
+                Exception lastError = new SalesforceException("Unknown error", null);
+                try {
+                    // reset client
+                    doStop();
 
-                        } catch (Exception e) {
-                            LOG.error("Error restarting: " + e.getMessage(), e);
-                            lastError = e;
-                        }
+                    // register listeners and restart
+                    doStart();
 
-                        if (client != null && client.isHandshook()) {
-                            LOG.info("Successfully restarted!");
-                            // reset backoff interval
-                            restartBackoff.set(client.getBackoffIncrement());
-                        } else {
-                            LOG.error("Failed to restart after pausing for {} msecs", backoff);
-                            if ((backoff + backoffIncrement) > maxBackoff) {
-                                // notify all consumers
-                                String abortMsg = "Aborting restart attempt due to: " + lastError.getMessage();
-                                SalesforceException ex = new SalesforceException(abortMsg, lastError);
-                                for (SalesforceConsumer consumer : listenerMap.keySet()) {
-                                    consumer.handleException(abortMsg, ex);
-                                }
-                            }
+                } catch (Exception e) {
+                    LOG.error("Error restarting: " + e.getMessage(), e);
+                    lastError = e;
+                }
+
+                if (client != null && client.isHandshook()) {
+                    LOG.info("Successfully restarted!");
+                    // reset backoff interval
+                    restartBackoff.set(client.getBackoffIncrement());
+                } else {
+                    LOG.error("Failed to restart after pausing for {} msecs", backoff);
+                    if ((backoff + backoffIncrement) > maxBackoff) {
+                        // notify all consumers
+                        String abortMsg = "Aborting restart attempt due to: " + lastError.getMessage();
+                        SalesforceException ex = new SalesforceException(abortMsg, lastError);
+                        for (SalesforceConsumer consumer : listenerMap.keySet()) {
+                            consumer.handleException(abortMsg, ex);
                         }
                     }
                 }
-
             }
-        });
+        }
     }
 
     @SuppressWarnings("unchecked")


[camel] 01/03: CAMEL-12871: stub server and test

Posted by zr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zregvart pushed a commit to branch issue/CAMEL-12871
in repository https://gitbox.apache.org/repos/asf/camel.git

commit a7fb4ec2f5c57f05f0ebf3c9c736979f014e83bf
Author: Zoran Regvart <zr...@apache.org>
AuthorDate: Wed Dec 16 15:52:35 2020 +0100

    CAMEL-12871: stub server and test
    
    Adds a stub server implemented in Jetty, as it is already pulled in as a
    dependency, and a integration test for the testing streaming resiliency.
    
    Two cases are added in the integration test: server closing the
    TCP connection (e.g. in a abrupt server shutdown), and restarting the
    `SubscriptionHelper` service (e.g. when route is restarted).
---
 .../salesforce/internal/streaming/StubServer.java  | 295 ++++++++++++++++
 .../SubscriptionHelperIntegrationTest.java         | 372 +++++++++++++++++++++
 2 files changed, 667 insertions(+)

diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/StubServer.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/StubServer.java
new file mode 100644
index 0000000..56bc8cf
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/StubServer.java
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.streaming;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.camel.util.IOHelper;
+import org.eclipse.jetty.io.EndPoint;
+import org.eclipse.jetty.server.AbstractNetworkConnector;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class StubServer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(StubServer.class);
+
+    private final List<StubResponse> defaultStubs = new ArrayList<>();
+
+    private final Server server;
+
+    private final List<StubResponse> stubs = new ArrayList<>();
+
+    class StubHandler extends AbstractHandler {
+
+        @Override
+        public void handle(
+                final String target, final Request baseRequest, final HttpServletRequest request,
+                final HttpServletResponse response)
+                throws IOException, ServletException {
+            final String body;
+            try (Reader bodyReader = request.getReader()) {
+                body = IOHelper.toString(bodyReader);
+            }
+
+            final StubResponse stub = stubFor(request, body);
+
+            if (stub == null) {
+                LOG.error("Stub not found for {} {}", request.getMethod(), request.getRequestURI());
+                response.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED);
+                return;
+            }
+
+            response.setStatus(stub.responseStatus);
+            response.setContentType("application/json;charset=UTF-8");
+
+            final String id = messageIdFrom(body);
+
+            try (Writer out = response.getWriter()) {
+                stub.writeTo(id, out);
+            }
+        }
+
+        private StubResponse stubFor(final HttpServletRequest request, final String body) throws IOException {
+            final List<StubResponse> allResponses = new ArrayList<>(defaultStubs);
+            allResponses.addAll(stubs);
+
+            for (final StubResponse stub : allResponses) {
+                if (stub.matches(request, body)) {
+                    return stub;
+                }
+            }
+
+            return null;
+        }
+
+    }
+
+    final class StubResponse {
+
+        private Predicate<String> requestCondition;
+
+        private final String requestMethod;
+
+        private final String requestPath;
+
+        private BlockingQueue<String> responseMessages;
+
+        private final int responseStatus;
+
+        private String responseString;
+
+        public StubResponse(final String requestMethod, final String requestPath, final int responseStatus,
+                            final Predicate<String> requestCondition,
+                            final BlockingQueue<String> responseMessages) {
+            this(requestMethod, requestPath, responseStatus, requestCondition);
+
+            this.responseMessages = responseMessages;
+        }
+
+        private StubResponse(final String requestMethod, final String requestPath, final int responseStatus) {
+            this.responseStatus = responseStatus;
+            this.requestMethod = Objects.requireNonNull(requestMethod, "requestMethod");
+            this.requestPath = Objects.requireNonNull(requestPath, "requestPath");
+        }
+
+        private StubResponse(final String requestMethod, final String requestPath, final int responseStatus,
+                             final BlockingQueue<String> responseMessages) {
+            this(requestMethod, requestPath, responseStatus);
+
+            this.responseMessages = responseMessages;
+        }
+
+        private StubResponse(final String requestMethod, final String requestPath, final int responseStatus,
+                             final Predicate<String> requestCondition) {
+            this(requestMethod, requestPath, responseStatus);
+
+            this.requestCondition = requestCondition;
+        }
+
+        private StubResponse(final String requestMethod, final String requestPath, final int responseStatus,
+                             final Predicate<String> requestCondition,
+                             final String responseString) {
+            this(requestMethod, requestPath, responseStatus, requestCondition);
+
+            this.responseString = responseString;
+        }
+
+        private StubResponse(final String requestMethod, final String requestPath, final int responseStatus,
+                             final String responseString) {
+            this(requestMethod, requestPath, responseStatus);
+            this.responseString = responseString;
+        }
+
+        @Override
+        public String toString() {
+            return requestMethod + " " + requestPath;
+        }
+
+        private boolean matches(final HttpServletRequest request, final String body) throws IOException {
+            final boolean matches = Objects.equals(requestMethod, request.getMethod())
+                    && Objects.equals(requestPath, request.getRequestURI());
+
+            if (!matches) {
+                return false;
+            }
+
+            if (requestCondition == null) {
+                return true;
+            }
+
+            return requestCondition.test(body);
+        }
+
+        private void writeTo(final String messageId, final Writer out) throws IOException {
+            if (responseString != null) {
+                out.write(responseString.replace("$id", messageId));
+                out.flush();
+                return;
+            }
+
+            if (responseMessages != null) {
+                while (true) {
+                    try {
+                        final String message = responseMessages.poll(25, TimeUnit.MILLISECONDS);
+                        if (message != null) {
+                            out.write(message.replace("$id", messageId));
+                            out.flush();
+                            return;
+                        }
+
+                        if (!server.isRunning()) {
+                            return;
+                        }
+                    } catch (final InterruptedException ignored) {
+                        return;
+                    }
+                }
+            }
+        }
+    }
+
+    public StubServer() {
+        server = new Server(0);
+        server.setHandler(new StubHandler());
+
+        try {
+            server.start();
+        } catch (final Exception e) {
+            throw new ExceptionInInitializerError(e);
+        }
+    }
+
+    @SuppressWarnings("resource")
+    public void abruptlyRestart() {
+        final int port = port();
+
+        stop();
+
+        connector().setPort(port);
+
+        try {
+            server.start();
+        } catch (final Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @SuppressWarnings("resource")
+    public int port() {
+        return connector().getLocalPort();
+    }
+
+    public void replyTo(final String method, final String path, final BlockingQueue<String> messages) {
+        stubs.add(new StubResponse(method, path, 200, messages));
+    }
+
+    public void replyTo(final String method, final String path, final int status) {
+        stubs.add(new StubResponse(method, path, status));
+    }
+
+    public void replyTo(
+            final String method, final String path, final Predicate<String> requestCondition,
+            final BlockingQueue<String> messages) {
+        stubs.add(new StubResponse(method, path, 200, requestCondition, messages));
+    }
+
+    public void replyTo(
+            final String method, final String path, final Predicate<String> requestCondition, final String response) {
+        stubs.add(new StubResponse(method, path, 200, requestCondition, response));
+    }
+
+    public void replyTo(final String method, final String path, final String response) {
+        stubs.add(new StubResponse(method, path, 200, response));
+    }
+
+    public void reset() {
+        stubs.clear();
+    }
+
+    public void stop() {
+        try {
+            for (final EndPoint endPoint : connector().getConnectedEndPoints()) {
+                endPoint.close();
+            }
+
+            server.stop();
+        } catch (final Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public void stubsAsDefaults() {
+        defaultStubs.addAll(stubs);
+        stubs.clear();
+    }
+
+    private AbstractNetworkConnector connector() {
+        final AbstractNetworkConnector connector = (AbstractNetworkConnector) server.getConnectors()[0];
+        return connector;
+    }
+
+    private static String messageIdFrom(final String body) {
+        int idx = body.indexOf("\"id\":\"");
+        String id = "";
+
+        if (idx > 0) {
+            idx += 6;
+            char ch;
+            while (Character.isDigit(ch = body.charAt(idx++))) {
+                id += ch;
+            }
+        }
+        return id;
+    }
+
+}
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java
new file mode 100644
index 0000000..49220f3
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.streaming;
+
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelException;
+import org.apache.camel.component.salesforce.AuthenticationType;
+import org.apache.camel.component.salesforce.SalesforceComponent;
+import org.apache.camel.component.salesforce.SalesforceConsumer;
+import org.apache.camel.component.salesforce.SalesforceEndpoint;
+import org.apache.camel.component.salesforce.SalesforceEndpointConfig;
+import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.cometd.bayeux.Message;
+import org.cometd.bayeux.client.ClientSessionChannel;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestInstance.Lifecycle;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelperIntegrationTest.MessageArgumentMatcher.messageForAccountCreationWithName;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@TestInstance(Lifecycle.PER_CLASS)
+public class SubscriptionHelperIntegrationTest {
+
+    final CamelContext camel;
+
+    final SalesforceEndpointConfig config = new SalesforceEndpointConfig();
+
+    final BlockingQueue<String> messages = new LinkedBlockingDeque<>();
+
+    final SalesforceComponent salesforce;
+
+    final StubServer server;
+
+    final SubscriptionHelper subscription;
+
+    SalesforceConsumer toUnsubscribe;
+
+    static class MessageArgumentMatcher implements ArgumentMatcher<Message> {
+
+        private final String name;
+
+        public MessageArgumentMatcher(final String name) {
+            this.name = name;
+        }
+
+        @Override
+        public boolean matches(final Message message) {
+            final Map<String, Object> data = message.getDataAsMap();
+
+            @SuppressWarnings("unchecked")
+            final Map<String, Object> event = (Map<String, Object>) data.get("event");
+
+            @SuppressWarnings("unchecked")
+            final Map<String, Object> sobject = (Map<String, Object>) data.get("sobject");
+
+            return "created".equals(event.get("type")) && name.equals(sobject.get("Name"));
+        }
+
+        static Message messageForAccountCreationWithName(final String name) {
+            return argThat(new MessageArgumentMatcher(name));
+        }
+
+    }
+
+    public SubscriptionHelperIntegrationTest() throws SalesforceException {
+        server = new StubServer();
+
+        LoggerFactory.getLogger(SubscriptionHelperIntegrationTest.class).info("Port for wireshark to filter: {}",
+                server.port());
+
+        final String instanceUrl = "http://localhost:" + server.port();
+
+        server.replyTo("POST", "/services/oauth2/token",
+                "{\"instance_url\":\"" + instanceUrl + "\",\"access_token\":\"token\"}");
+
+        server.replyTo("GET", "/services/oauth2/revoke?token=token", 200);
+
+        server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/handshake", "[\n"
+                                                                                                     + "  {\n"
+                                                                                                     + "    \"ext\": {\n"
+                                                                                                     + "      \"replay\": true,\n"
+                                                                                                     + "      \"payload.format\": true\n"
+                                                                                                     + "    },\n"
+                                                                                                     + "    \"minimumVersion\": \"1.0\",\n"
+                                                                                                     + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+                                                                                                     + "    \"supportedConnectionTypes\": [\n"
+                                                                                                     + "      \"long-polling\"\n"
+                                                                                                     + "    ],\n"
+                                                                                                     + "    \"channel\": \"/meta/handshake\",\n"
+                                                                                                     + "    \"id\": \"$id\",\n"
+                                                                                                     + "    \"version\": \"1.0\",\n"
+                                                                                                     + "    \"successful\": true\n"
+                                                                                                     + "  }\n"
+                                                                                                     + "]");
+
+        server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/connect",
+                (req) -> req.contains("\"timeout\":0"), "[\n"
+                                                        + "  {\n"
+                                                        + "    \"clientId\": \"1f0agp5a95yiaeb1kifib37r5z4g\",\n"
+                                                        + "    \"advice\": {\n"
+                                                        + "      \"interval\": 0,\n"
+                                                        + "      \"timeout\": 110000,\n"
+                                                        + "      \"reconnect\": \"retry\"\n"
+                                                        + "    },\n"
+                                                        + "    \"channel\": \"/meta/connect\",\n"
+                                                        + "    \"id\": \"$id\",\n"
+                                                        + "    \"successful\": true\n"
+                                                        + "  }\n"
+                                                        + "]");
+
+        server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/connect", messages);
+
+        server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/subscribe", "[\n"
+                                                                                                     + "  {\n"
+                                                                                                     + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+                                                                                                     + "    \"channel\": \"/meta/subscribe\",\n"
+                                                                                                     + "    \"id\": \"$id\",\n"
+                                                                                                     + "    \"subscription\": \"/topic/Account\",\n"
+                                                                                                     + "    \"successful\": true\n"
+                                                                                                     + "  }\n"
+                                                                                                     + "]");
+
+        server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/unsubscribe", "[\n"
+                                                                                                       + "  {\n"
+                                                                                                       + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+                                                                                                       + "    \"channel\": \"/meta/unsubscribe\",\n"
+                                                                                                       + "    \"id\": \"$id\",\n"
+                                                                                                       + "    \"subscription\": \"/topic/Account\",\n"
+                                                                                                       + "    \"successful\": true\n"
+                                                                                                       + "  }\n"
+                                                                                                       + "]");
+
+        server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/disconnect", "[\n"
+                                                                                                      + "  {\n"
+                                                                                                      + "     \"channel\": \"/meta/disconnect\",\n"
+                                                                                                      + "     \"clientId\": \"client-id\"\n"
+                                                                                                      + "   }\n"
+                                                                                                      + "]");
+
+        server.replyTo("GET", "/services/oauth2/revoke", 200);
+
+        server.stubsAsDefaults();
+
+        camel = new DefaultCamelContext();
+        camel.start();
+        salesforce = new SalesforceComponent(camel);
+        salesforce.setLoginUrl(instanceUrl);
+        salesforce.setClientId("clientId");
+        salesforce.setClientSecret("clientSecret");
+        salesforce.setRefreshToken("refreshToken");
+        salesforce.setAuthenticationType(AuthenticationType.REFRESH_TOKEN);
+        salesforce.setConfig(config);
+
+        salesforce.start();
+        subscription = new SubscriptionHelper(salesforce);
+    }
+
+    @BeforeEach
+    public void cleanSlate() throws CamelException {
+        if (toUnsubscribe != null) {
+            subscription.unsubscribe("Account", toUnsubscribe);
+        }
+        server.reset();
+    }
+
+    @AfterAll
+    public void stop() {
+        salesforce.stop();
+        camel.stop();
+        server.stop();
+    }
+
+    @Test
+    void shouldResubscribeOnConnectionFailures() throws InterruptedException {
+        // handshake and connect
+        subscription.start();
+
+        final SalesforceConsumer consumer
+                = toUnsubscribe = mock(SalesforceConsumer.class, "shouldResubscribeOnConnectionFailures:consumer");
+
+        final SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class, "shouldResubscribeOnConnectionFailures:endpoint");
+
+        // subscribe
+        when(consumer.getTopicName()).thenReturn("Account");
+
+        when(consumer.getEndpoint()).thenReturn(endpoint);
+        when(endpoint.getConfiguration()).thenReturn(config);
+        when(endpoint.getComponent()).thenReturn(salesforce);
+        when(endpoint.getTopicName()).thenReturn("Account");
+
+        subscription.subscribe("Account", consumer);
+
+        // push one message so we know connection is established and consumer
+        // receives notifications
+        messages.add("[\n"
+                     + "  {\n"
+                     + "    \"data\": {\n"
+                     + "      \"event\": {\n"
+                     + "        \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n"
+                     + "        \"replayId\": 1,\n"
+                     + "        \"type\": \"created\"\n"
+                     + "      },\n"
+                     + "      \"sobject\": {\n"
+                     + "        \"Id\": \"0011n00002XWMgVAAX\",\n"
+                     + "        \"Name\": \"shouldResubscribeOnConnectionFailures 1\"\n"
+                     + "      }\n"
+                     + "    },\n"
+                     + "    \"channel\": \"/topic/Account\"\n"
+                     + "  },\n"
+                     + "  {\n"
+                     + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+                     + "    \"channel\": \"/meta/connect\",\n"
+                     + "    \"id\": \"$id\",\n"
+                     + "    \"successful\": true\n"
+                     + "  }\n"
+                     + "]");
+
+        verify(consumer, Mockito.timeout(100)).processMessage(any(ClientSessionChannel.class),
+                messageForAccountCreationWithName("shouldResubscribeOnConnectionFailures 1"));
+
+        // terminate server abruptly by closing the connection (sends FIN, ACK)
+        server.abruptlyRestart();
+
+        // queue next message for when the client recovers
+        messages.add("[\n"
+                     + "  {\n"
+                     + "    \"data\": {\n"
+                     + "      \"event\": {\n"
+                     + "        \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n"
+                     + "        \"replayId\": 2,\n"
+                     + "        \"type\": \"created\"\n"
+                     + "      },\n"
+                     + "      \"sobject\": {\n"
+                     + "        \"Id\": \"0011n00002XWMgVAAX\",\n"
+                     + "        \"Name\": \"shouldResubscribeOnConnectionFailures 2\"\n"
+                     + "      }\n"
+                     + "    },\n"
+                     + "    \"channel\": \"/topic/Account\"\n"
+                     + "  },\n"
+                     + "  {\n"
+                     + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+                     + "    \"channel\": \"/meta/connect\",\n"
+                     + "    \"id\": \"$id\",\n"
+                     + "    \"successful\": true\n"
+                     + "  }\n"
+                     + "]");
+
+        // assert last message was received, recovery can take a bit
+        verify(consumer, timeout(10000)).processMessage(any(ClientSessionChannel.class),
+                messageForAccountCreationWithName("shouldResubscribeOnConnectionFailures 2"));
+
+        verify(consumer, atLeastOnce()).getEndpoint();
+        verify(consumer, atLeastOnce()).getTopicName();
+        verifyNoMoreInteractions(consumer);
+    }
+
+    @Test
+    void shouldResubscribeOnHelperRestart() {
+        // handshake and connect
+        subscription.start();
+
+        final SalesforceConsumer consumer
+                = toUnsubscribe = mock(SalesforceConsumer.class, "shouldResubscribeOnHelperRestart:consumer");
+
+        final SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class, "shouldResubscribeOnHelperRestart:endpoint");
+
+        // subscribe
+        when(consumer.getTopicName()).thenReturn("Account");
+
+        when(consumer.getEndpoint()).thenReturn(endpoint);
+        when(endpoint.getConfiguration()).thenReturn(config);
+        when(endpoint.getComponent()).thenReturn(salesforce);
+        when(endpoint.getTopicName()).thenReturn("Account");
+
+        subscription.subscribe("Account", consumer);
+
+        // push one message so we know connection is established and consumer
+        // receives notifications
+        messages.add("[\n"
+                     + "  {\n"
+                     + "    \"data\": {\n"
+                     + "      \"event\": {\n"
+                     + "        \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n"
+                     + "        \"replayId\": 1,\n"
+                     + "        \"type\": \"created\"\n"
+                     + "      },\n"
+                     + "      \"sobject\": {\n"
+                     + "        \"Id\": \"0011n00002XWMgVAAX\",\n"
+                     + "        \"Name\": \"shouldResubscribeOnHelperRestart 1\"\n"
+                     + "      }\n"
+                     + "    },\n"
+                     + "    \"channel\": \"/topic/Account\"\n"
+                     + "  },\n"
+                     + "  {\n"
+                     + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+                     + "    \"channel\": \"/meta/connect\",\n"
+                     + "    \"id\": \"$id\",\n"
+                     + "    \"successful\": true\n"
+                     + "  }\n"
+                     + "]");
+        verify(consumer, timeout(100)).processMessage(any(ClientSessionChannel.class),
+                messageForAccountCreationWithName("shouldResubscribeOnHelperRestart 1"));
+
+        // stop and start the subscription helper
+        subscription.stop();
+        subscription.start();
+
+        // queue next message for when the client recovers
+        messages.add("[\n"
+                     + "  {\n"
+                     + "    \"data\": {\n"
+                     + "      \"event\": {\n"
+                     + "        \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n"
+                     + "        \"replayId\": 2,\n"
+                     + "        \"type\": \"created\"\n"
+                     + "      },\n"
+                     + "      \"sobject\": {\n"
+                     + "        \"Id\": \"0011n00002XWMgVAAX\",\n"
+                     + "        \"Name\": \"shouldResubscribeOnHelperRestart 2\"\n"
+                     + "      }\n"
+                     + "    },\n"
+                     + "    \"channel\": \"/topic/Account\"\n"
+                     + "  },\n"
+                     + "  {\n"
+                     + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+                     + "    \"channel\": \"/meta/connect\",\n"
+                     + "    \"id\": \"$id\",\n"
+                     + "    \"successful\": true\n"
+                     + "  }\n"
+                     + "]");
+
+        // assert last message was received, recovery can take a bit
+        verify(consumer, timeout(2000)).processMessage(any(ClientSessionChannel.class),
+                messageForAccountCreationWithName("shouldResubscribeOnHelperRestart 2"));
+
+        verify(consumer, atLeastOnce()).getEndpoint();
+        verify(consumer, atLeastOnce()).getTopicName();
+        verifyNoMoreInteractions(consumer);
+    }
+}


[camel] 02/03: CAMEL-12871: release resources on stop

Posted by zr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zregvart pushed a commit to branch issue/CAMEL-12871
in repository https://gitbox.apache.org/repos/asf/camel.git

commit ca55b7958ea393bf9b7815d25fbd21574819ddd5
Author: Zoran Regvart <zr...@apache.org>
AuthorDate: Wed Dec 16 14:41:17 2020 +0100

    CAMEL-12871: release resources on stop
    
    When SubscriptionHelper is stopped we need to remove all listeners and
    close channels this helper is listening on.
---
 .../internal/streaming/SubscriptionHelper.java     | 29 +++++++++++++++++++---
 1 file changed, 26 insertions(+), 3 deletions(-)

diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index 625b2f2..25e363b 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -36,6 +36,7 @@ import org.apache.camel.component.salesforce.internal.SalesforceSession;
 import org.apache.camel.support.service.ServiceSupport;
 import org.cometd.bayeux.Message;
 import org.cometd.bayeux.client.ClientSessionChannel;
+import org.cometd.bayeux.client.ClientSessionChannel.MessageListener;
 import org.cometd.client.BayeuxClient;
 import org.cometd.client.BayeuxClient.State;
 import org.cometd.client.transport.ClientTransport;
@@ -320,11 +321,33 @@ public class SubscriptionHelper extends ServiceSupport {
         return exception;
     }
 
+    private void closeChannel(final String name, MessageListener listener) {
+        if (client == null) {
+            return;
+        }
+
+        final ClientSessionChannel channel = client.getChannel(name);
+        channel.removeListener(listener);
+        channel.release();
+    }
+
     @Override
     protected void doStop() throws Exception {
-        client.getChannel(META_DISCONNECT).removeListener(disconnectListener);
-        client.getChannel(META_CONNECT).removeListener(connectListener);
-        client.getChannel(META_HANDSHAKE).removeListener(handshakeListener);
+        closeChannel(META_DISCONNECT, disconnectListener);
+        closeChannel(META_CONNECT, connectListener);
+        closeChannel(META_HANDSHAKE, handshakeListener);
+
+        for (Map.Entry<SalesforceConsumer, MessageListener> entry : listenerMap.entrySet()) {
+            final SalesforceConsumer consumer = entry.getKey();
+            final String topic = consumer.getTopicName();
+
+            final MessageListener listener = entry.getValue();
+            closeChannel(getChannelName(topic), listener);
+        }
+
+        if (client == null) {
+            return;
+        }
 
         client.disconnect();
         boolean disconnected = client.waitFor(timeout, State.DISCONNECTED);