You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by zr...@apache.org on 2020/12/18 12:06:47 UTC

[camel] branch master updated (06fa1d8 -> 65dd97a)

This is an automated email from the ASF dual-hosted git repository.

zregvart pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 06fa1d8  CAMEL-15969 - Added a note about the policy option
     new 8af5f2f  CAMEL-12871: stub server and test
     new 78ea9bb  CAMEL-12871: release resources on stop
     new 65dd97a  CAMEL-12871: disconnect on handshake failure

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../internal/streaming/SubscriptionHelper.java     | 161 +++++----
 .../salesforce/internal/streaming/StubServer.java  | 295 ++++++++++++++++
 .../SubscriptionHelperIntegrationTest.java         | 372 +++++++++++++++++++++
 3 files changed, 770 insertions(+), 58 deletions(-)
 create mode 100644 components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/StubServer.java
 create mode 100644 components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java


[camel] 03/03: CAMEL-12871: disconnect on handshake failure

Posted by zr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zregvart pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 65dd97a6b60dd29a490aaef796f0c7b92ba8b0f6
Author: Zoran Regvart <zr...@apache.org>
AuthorDate: Wed Dec 16 14:43:13 2020 +0100

    CAMEL-12871: disconnect on handshake failure
    
    If we can't connect and perform the handshake, disconnecting will
    trigger client restart with back-off. Also when restarting as the signal
    to restart can occur on multiple threads we need to guard against
    restart happening in parallel.
---
 .../internal/streaming/SubscriptionHelper.java     | 132 ++++++++++++---------
 .../SubscriptionHelperIntegrationTest.java         |  24 ++--
 2 files changed, 89 insertions(+), 67 deletions(-)

diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index 25e363b..c97fb02 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -22,6 +22,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
 
@@ -93,6 +94,7 @@ public class SubscriptionHelper extends ServiceSupport {
 
     private volatile boolean reconnecting;
     private final AtomicLong restartBackoff;
+    private final AtomicBoolean restarting = new AtomicBoolean();
 
     public SubscriptionHelper(final SalesforceComponent component) throws SalesforceException {
         this.component = component;
@@ -173,6 +175,7 @@ public class SubscriptionHelper extends ServiceSupport {
                         connectError = (String) message.get(ERROR_FIELD);
                         connectException = getFailure(message);
 
+                        client.disconnect();
                     } else if (reconnecting) {
 
                         reconnecting = false;
@@ -206,6 +209,10 @@ public class SubscriptionHelper extends ServiceSupport {
         }
         client.getChannel(META_DISCONNECT).addListener(disconnectListener);
 
+        connect();
+    }
+
+    private void connect() throws CamelException {
         // connect to Salesforce cometd endpoint
         client.handshake();
 
@@ -229,80 +236,95 @@ public class SubscriptionHelper extends ServiceSupport {
 
     // launch an async task to restart
     private void restartClient() {
+        if (!restarting.compareAndSet(false, true)) {
+            return;
+        }
 
         // launch a new restart command
         final SalesforceHttpClient httpClient = component.getConfig().getHttpClient();
         httpClient.getExecutor().execute(new Runnable() {
             @Override
             public void run() {
+                try {
+                    performClientRestart();
+                } finally {
+                    restarting.set(false);
+                }
+            }
+        });
+    }
 
-                LOG.info("Restarting on unexpected disconnect from Salesforce...");
-                boolean abort = false;
+    private void performClientRestart() {
+        if (isStoppingOrStopped()) {
+            return;
+        }
 
-                // wait for disconnect
-                LOG.debug("Waiting to disconnect...");
-                while (!client.isDisconnected()) {
-                    try {
-                        Thread.sleep(DISCONNECT_INTERVAL);
-                    } catch (InterruptedException e) {
-                        LOG.error("Aborting restart on interrupt!");
-                        abort = true;
-                    }
-                }
+        LOG.info("Restarting on unexpected disconnect from Salesforce...");
+        boolean abort = false;
+
+        // wait for disconnect
+        LOG.debug("Waiting to disconnect...");
+        while (!abort && !client.isDisconnected()) {
+            try {
+                Thread.sleep(DISCONNECT_INTERVAL);
+            } catch (InterruptedException e) {
+                LOG.error("Aborting restart on interrupt!");
+                abort = true;
+            }
 
-                if (!abort) {
+            abort = isStoppingOrStopped();
+        }
 
-                    // update restart attempt backoff
-                    final long backoff = restartBackoff.getAndAdd(backoffIncrement);
-                    if (backoff > maxBackoff) {
-                        LOG.error("Restart aborted after exceeding {} msecs backoff", maxBackoff);
-                        abort = true;
-                    } else {
+        if (!abort) {
 
-                        // pause before restart attempt
-                        LOG.debug("Pausing for {} msecs before restart attempt", backoff);
-                        try {
-                            Thread.sleep(backoff);
-                        } catch (InterruptedException e) {
-                            LOG.error("Aborting restart on interrupt!");
-                            abort = true;
-                        }
-                    }
+            // update restart attempt backoff
+            final long backoff = restartBackoff.getAndAdd(backoffIncrement);
+            if (backoff > maxBackoff) {
+                LOG.error("Restart aborted after exceeding {} msecs backoff", maxBackoff);
+                abort = true;
+            } else {
 
-                    if (!abort) {
-                        Exception lastError = new SalesforceException("Unknown error", null);
-                        try {
-                            // reset client
-                            doStop();
+                // pause before restart attempt
+                LOG.debug("Pausing for {} msecs before restart attempt", backoff);
+                try {
+                    Thread.sleep(backoff);
+                } catch (InterruptedException e) {
+                    LOG.error("Aborting restart on interrupt!");
+                    abort = true;
+                }
+            }
 
-                            // register listeners and restart
-                            doStart();
+            if (!abort) {
+                Exception lastError = new SalesforceException("Unknown error", null);
+                try {
+                    // reset client
+                    doStop();
 
-                        } catch (Exception e) {
-                            LOG.error("Error restarting: " + e.getMessage(), e);
-                            lastError = e;
-                        }
+                    // register listeners and restart
+                    doStart();
 
-                        if (client != null && client.isHandshook()) {
-                            LOG.info("Successfully restarted!");
-                            // reset backoff interval
-                            restartBackoff.set(client.getBackoffIncrement());
-                        } else {
-                            LOG.error("Failed to restart after pausing for {} msecs", backoff);
-                            if ((backoff + backoffIncrement) > maxBackoff) {
-                                // notify all consumers
-                                String abortMsg = "Aborting restart attempt due to: " + lastError.getMessage();
-                                SalesforceException ex = new SalesforceException(abortMsg, lastError);
-                                for (SalesforceConsumer consumer : listenerMap.keySet()) {
-                                    consumer.handleException(abortMsg, ex);
-                                }
-                            }
+                } catch (Exception e) {
+                    LOG.error("Error restarting: " + e.getMessage(), e);
+                    lastError = e;
+                }
+
+                if (client != null && client.isHandshook()) {
+                    LOG.info("Successfully restarted!");
+                    // reset backoff interval
+                    restartBackoff.set(client.getBackoffIncrement());
+                } else {
+                    LOG.error("Failed to restart after pausing for {} msecs", backoff);
+                    if ((backoff + backoffIncrement) > maxBackoff) {
+                        // notify all consumers
+                        String abortMsg = "Aborting restart attempt due to: " + lastError.getMessage();
+                        SalesforceException ex = new SalesforceException(abortMsg, lastError);
+                        for (SalesforceConsumer consumer : listenerMap.keySet()) {
+                            consumer.handleException(abortMsg, ex);
                         }
                     }
                 }
-
             }
-        });
+        }
     }
 
     @SuppressWarnings("unchecked")
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java
index 0491ca7..8c01631 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java
@@ -127,18 +127,18 @@ public class SubscriptionHelperIntegrationTest {
 
         server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/connect",
                 req -> req.contains("\"timeout\":0"), "[\n"
-                                                        + "  {\n"
-                                                        + "    \"clientId\": \"1f0agp5a95yiaeb1kifib37r5z4g\",\n"
-                                                        + "    \"advice\": {\n"
-                                                        + "      \"interval\": 0,\n"
-                                                        + "      \"timeout\": 110000,\n"
-                                                        + "      \"reconnect\": \"retry\"\n"
-                                                        + "    },\n"
-                                                        + "    \"channel\": \"/meta/connect\",\n"
-                                                        + "    \"id\": \"$id\",\n"
-                                                        + "    \"successful\": true\n"
-                                                        + "  }\n"
-                                                        + "]");
+                                                      + "  {\n"
+                                                      + "    \"clientId\": \"1f0agp5a95yiaeb1kifib37r5z4g\",\n"
+                                                      + "    \"advice\": {\n"
+                                                      + "      \"interval\": 0,\n"
+                                                      + "      \"timeout\": 110000,\n"
+                                                      + "      \"reconnect\": \"retry\"\n"
+                                                      + "    },\n"
+                                                      + "    \"channel\": \"/meta/connect\",\n"
+                                                      + "    \"id\": \"$id\",\n"
+                                                      + "    \"successful\": true\n"
+                                                      + "  }\n"
+                                                      + "]");
 
         server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/connect", messages);
 


[camel] 01/03: CAMEL-12871: stub server and test

Posted by zr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zregvart pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 8af5f2f31341e391064f8ba530bcc14fdafb38f4
Author: Zoran Regvart <zr...@apache.org>
AuthorDate: Wed Dec 16 22:09:23 2020 +0100

    CAMEL-12871: stub server and test
    
    Adds a stub server implemented in Jetty, as it is already pulled in as a
    dependency, and a integration test for the testing streaming resiliency.
    
    Two cases are added in the integration test: server closing the
    TCP connection (e.g. in a abrupt server shutdown), and restarting the
    `SubscriptionHelper` service (e.g. when route is restarted).
---
 .../salesforce/internal/streaming/StubServer.java  | 295 ++++++++++++++++
 .../SubscriptionHelperIntegrationTest.java         | 372 +++++++++++++++++++++
 2 files changed, 667 insertions(+)

diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/StubServer.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/StubServer.java
new file mode 100644
index 0000000..76d5d1b
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/StubServer.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.streaming;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.camel.util.IOHelper;
+import org.eclipse.jetty.io.EndPoint;
+import org.eclipse.jetty.server.AbstractNetworkConnector;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class StubServer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(StubServer.class);
+
+    private final List<StubResponse> defaultStubs = new ArrayList<>();
+
+    private final Server server;
+
+    private final List<StubResponse> stubs = new ArrayList<>();
+
+    class StubHandler extends AbstractHandler {
+
+        @Override
+        public void handle(
+                final String target, final Request baseRequest, final HttpServletRequest request,
+                final HttpServletResponse response)
+                throws IOException, ServletException {
+            final String body;
+            try (Reader bodyReader = request.getReader()) {
+                body = IOHelper.toString(bodyReader);
+            }
+
+            final StubResponse stub = stubFor(request, body);
+
+            if (stub == null) {
+                LOG.error("Stub not found for {} {}", request.getMethod(), request.getRequestURI());
+                response.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED);
+                return;
+            }
+
+            response.setStatus(stub.responseStatus);
+            response.setContentType("application/json;charset=UTF-8");
+
+            final String id = messageIdFrom(body);
+
+            try (Writer out = response.getWriter()) {
+                stub.writeTo(id, out);
+            }
+        }
+
+        private StubResponse stubFor(final HttpServletRequest request, final String body) throws IOException {
+            final List<StubResponse> allResponses = new ArrayList<>(defaultStubs);
+            allResponses.addAll(stubs);
+
+            for (final StubResponse stub : allResponses) {
+                if (stub.matches(request, body)) {
+                    return stub;
+                }
+            }
+
+            return null;
+        }
+
+    }
+
+    final class StubResponse {
+
+        private Predicate<String> requestCondition;
+
+        private final String requestMethod;
+
+        private final String requestPath;
+
+        private BlockingQueue<String> responseMessages;
+
+        private final int responseStatus;
+
+        private String responseString;
+
+        public StubResponse(final String requestMethod, final String requestPath, final int responseStatus,
+                            final Predicate<String> requestCondition,
+                            final BlockingQueue<String> responseMessages) {
+            this(requestMethod, requestPath, responseStatus, requestCondition);
+
+            this.responseMessages = responseMessages;
+        }
+
+        private StubResponse(final String requestMethod, final String requestPath, final int responseStatus) {
+            this.responseStatus = responseStatus;
+            this.requestMethod = Objects.requireNonNull(requestMethod, "requestMethod");
+            this.requestPath = Objects.requireNonNull(requestPath, "requestPath");
+        }
+
+        private StubResponse(final String requestMethod, final String requestPath, final int responseStatus,
+                             final BlockingQueue<String> responseMessages) {
+            this(requestMethod, requestPath, responseStatus);
+
+            this.responseMessages = responseMessages;
+        }
+
+        private StubResponse(final String requestMethod, final String requestPath, final int responseStatus,
+                             final Predicate<String> requestCondition) {
+            this(requestMethod, requestPath, responseStatus);
+
+            this.requestCondition = requestCondition;
+        }
+
+        private StubResponse(final String requestMethod, final String requestPath, final int responseStatus,
+                             final Predicate<String> requestCondition,
+                             final String responseString) {
+            this(requestMethod, requestPath, responseStatus, requestCondition);
+
+            this.responseString = responseString;
+        }
+
+        private StubResponse(final String requestMethod, final String requestPath, final int responseStatus,
+                             final String responseString) {
+            this(requestMethod, requestPath, responseStatus);
+            this.responseString = responseString;
+        }
+
+        @Override
+        public String toString() {
+            return requestMethod + " " + requestPath;
+        }
+
+        private boolean matches(final HttpServletRequest request, final String body) throws IOException {
+            final boolean matches = Objects.equals(requestMethod, request.getMethod())
+                    && Objects.equals(requestPath, request.getRequestURI());
+
+            if (!matches) {
+                return false;
+            }
+
+            if (requestCondition == null) {
+                return true;
+            }
+
+            return requestCondition.test(body);
+        }
+
+        private void writeTo(final String messageId, final Writer out) throws IOException {
+            if (responseString != null) {
+                out.write(responseString.replace("$id", messageId));
+                out.flush();
+                return;
+            }
+
+            if (responseMessages != null) {
+                while (true) {
+                    try {
+                        final String message = responseMessages.poll(25, TimeUnit.MILLISECONDS);
+                        if (message != null) {
+                            out.write(message.replace("$id", messageId));
+                            out.flush();
+                            return;
+                        }
+
+                        if (!server.isRunning()) {
+                            return;
+                        }
+                    } catch (final InterruptedException ignored) {
+                        return;
+                    }
+                }
+            }
+        }
+    }
+
+    public StubServer() {
+        server = new Server(0);
+        server.setHandler(new StubHandler());
+
+        try {
+            server.start();
+        } catch (final Exception e) {
+            throw new ExceptionInInitializerError(e);
+        }
+    }
+
+    @SuppressWarnings("resource")
+    public void abruptlyRestart() {
+        final int port = port();
+
+        stop();
+
+        connector().setPort(port);
+
+        try {
+            server.start();
+        } catch (final Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @SuppressWarnings("resource")
+    public int port() {
+        return connector().getLocalPort();
+    }
+
+    public void replyTo(final String method, final String path, final BlockingQueue<String> messages) {
+        stubs.add(new StubResponse(method, path, 200, messages));
+    }
+
+    public void replyTo(final String method, final String path, final int status) {
+        stubs.add(new StubResponse(method, path, status));
+    }
+
+    public void replyTo(
+            final String method, final String path, final Predicate<String> requestCondition,
+            final BlockingQueue<String> messages) {
+        stubs.add(new StubResponse(method, path, 200, requestCondition, messages));
+    }
+
+    public void replyTo(
+            final String method, final String path, final Predicate<String> requestCondition, final String response) {
+        stubs.add(new StubResponse(method, path, 200, requestCondition, response));
+    }
+
+    public void replyTo(final String method, final String path, final String response) {
+        stubs.add(new StubResponse(method, path, 200, response));
+    }
+
+    public void reset() {
+        stubs.clear();
+    }
+
+    public void stop() {
+        try {
+            for (final EndPoint endPoint : connector().getConnectedEndPoints()) {
+                endPoint.close();
+            }
+
+            server.stop();
+        } catch (final Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public void stubsAsDefaults() {
+        defaultStubs.addAll(stubs);
+        stubs.clear();
+    }
+
+    private AbstractNetworkConnector connector() {
+        final AbstractNetworkConnector connector = (AbstractNetworkConnector) server.getConnectors()[0];
+        return connector;
+    }
+
+    private static String messageIdFrom(final String body) {
+        int idx = body.indexOf("\"id\":\"");
+        String id = "";
+
+        if (idx > 0) {
+            idx += 6;
+            char ch;
+            while (Character.isDigit(ch = body.charAt(idx++))) {
+                id += ch;
+            }
+        }
+        return id;
+    }
+
+}
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java
new file mode 100644
index 0000000..0491ca7
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.streaming;
+
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelException;
+import org.apache.camel.component.salesforce.AuthenticationType;
+import org.apache.camel.component.salesforce.SalesforceComponent;
+import org.apache.camel.component.salesforce.SalesforceConsumer;
+import org.apache.camel.component.salesforce.SalesforceEndpoint;
+import org.apache.camel.component.salesforce.SalesforceEndpointConfig;
+import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.cometd.bayeux.Message;
+import org.cometd.bayeux.client.ClientSessionChannel;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestInstance.Lifecycle;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelperIntegrationTest.MessageArgumentMatcher.messageForAccountCreationWithName;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@TestInstance(Lifecycle.PER_CLASS)
+public class SubscriptionHelperIntegrationTest {
+
+    final CamelContext camel;
+
+    final SalesforceEndpointConfig config = new SalesforceEndpointConfig();
+
+    final BlockingQueue<String> messages = new LinkedBlockingDeque<>();
+
+    final SalesforceComponent salesforce;
+
+    final StubServer server;
+
+    final SubscriptionHelper subscription;
+
+    SalesforceConsumer toUnsubscribe;
+
+    static class MessageArgumentMatcher implements ArgumentMatcher<Message> {
+
+        private final String name;
+
+        public MessageArgumentMatcher(final String name) {
+            this.name = name;
+        }
+
+        @Override
+        public boolean matches(final Message message) {
+            final Map<String, Object> data = message.getDataAsMap();
+
+            @SuppressWarnings("unchecked")
+            final Map<String, Object> event = (Map<String, Object>) data.get("event");
+
+            @SuppressWarnings("unchecked")
+            final Map<String, Object> sobject = (Map<String, Object>) data.get("sobject");
+
+            return "created".equals(event.get("type")) && name.equals(sobject.get("Name"));
+        }
+
+        static Message messageForAccountCreationWithName(final String name) {
+            return argThat(new MessageArgumentMatcher(name));
+        }
+
+    }
+
+    public SubscriptionHelperIntegrationTest() throws SalesforceException {
+        server = new StubServer();
+
+        LoggerFactory.getLogger(SubscriptionHelperIntegrationTest.class).info("Port for wireshark to filter: {}",
+                server.port());
+
+        final String instanceUrl = "http://localhost:" + server.port();
+
+        server.replyTo("POST", "/services/oauth2/token",
+                "{\"instance_url\":\"" + instanceUrl + "\",\"access_token\":\"token\"}");
+
+        server.replyTo("GET", "/services/oauth2/revoke?token=token", 200);
+
+        server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/handshake", "[\n"
+                                                                                                     + "  {\n"
+                                                                                                     + "    \"ext\": {\n"
+                                                                                                     + "      \"replay\": true,\n"
+                                                                                                     + "      \"payload.format\": true\n"
+                                                                                                     + "    },\n"
+                                                                                                     + "    \"minimumVersion\": \"1.0\",\n"
+                                                                                                     + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+                                                                                                     + "    \"supportedConnectionTypes\": [\n"
+                                                                                                     + "      \"long-polling\"\n"
+                                                                                                     + "    ],\n"
+                                                                                                     + "    \"channel\": \"/meta/handshake\",\n"
+                                                                                                     + "    \"id\": \"$id\",\n"
+                                                                                                     + "    \"version\": \"1.0\",\n"
+                                                                                                     + "    \"successful\": true\n"
+                                                                                                     + "  }\n"
+                                                                                                     + "]");
+
+        server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/connect",
+                req -> req.contains("\"timeout\":0"), "[\n"
+                                                        + "  {\n"
+                                                        + "    \"clientId\": \"1f0agp5a95yiaeb1kifib37r5z4g\",\n"
+                                                        + "    \"advice\": {\n"
+                                                        + "      \"interval\": 0,\n"
+                                                        + "      \"timeout\": 110000,\n"
+                                                        + "      \"reconnect\": \"retry\"\n"
+                                                        + "    },\n"
+                                                        + "    \"channel\": \"/meta/connect\",\n"
+                                                        + "    \"id\": \"$id\",\n"
+                                                        + "    \"successful\": true\n"
+                                                        + "  }\n"
+                                                        + "]");
+
+        server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/connect", messages);
+
+        server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/subscribe", "[\n"
+                                                                                                     + "  {\n"
+                                                                                                     + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+                                                                                                     + "    \"channel\": \"/meta/subscribe\",\n"
+                                                                                                     + "    \"id\": \"$id\",\n"
+                                                                                                     + "    \"subscription\": \"/topic/Account\",\n"
+                                                                                                     + "    \"successful\": true\n"
+                                                                                                     + "  }\n"
+                                                                                                     + "]");
+
+        server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/unsubscribe", "[\n"
+                                                                                                       + "  {\n"
+                                                                                                       + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+                                                                                                       + "    \"channel\": \"/meta/unsubscribe\",\n"
+                                                                                                       + "    \"id\": \"$id\",\n"
+                                                                                                       + "    \"subscription\": \"/topic/Account\",\n"
+                                                                                                       + "    \"successful\": true\n"
+                                                                                                       + "  }\n"
+                                                                                                       + "]");
+
+        server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/disconnect", "[\n"
+                                                                                                      + "  {\n"
+                                                                                                      + "     \"channel\": \"/meta/disconnect\",\n"
+                                                                                                      + "     \"clientId\": \"client-id\"\n"
+                                                                                                      + "   }\n"
+                                                                                                      + "]");
+
+        server.replyTo("GET", "/services/oauth2/revoke", 200);
+
+        server.stubsAsDefaults();
+
+        camel = new DefaultCamelContext();
+        camel.start();
+        salesforce = new SalesforceComponent(camel);
+        salesforce.setLoginUrl(instanceUrl);
+        salesforce.setClientId("clientId");
+        salesforce.setClientSecret("clientSecret");
+        salesforce.setRefreshToken("refreshToken");
+        salesforce.setAuthenticationType(AuthenticationType.REFRESH_TOKEN);
+        salesforce.setConfig(config);
+
+        salesforce.start();
+        subscription = new SubscriptionHelper(salesforce);
+    }
+
+    @BeforeEach
+    public void cleanSlate() throws CamelException {
+        if (toUnsubscribe != null) {
+            subscription.unsubscribe("Account", toUnsubscribe);
+        }
+        server.reset();
+    }
+
+    @AfterAll
+    public void stop() {
+        salesforce.stop();
+        camel.stop();
+        server.stop();
+    }
+
+    @Test
+    void shouldResubscribeOnConnectionFailures() throws InterruptedException {
+        // handshake and connect
+        subscription.start();
+
+        final SalesforceConsumer consumer
+                = toUnsubscribe = mock(SalesforceConsumer.class, "shouldResubscribeOnConnectionFailures:consumer");
+
+        final SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class, "shouldResubscribeOnConnectionFailures:endpoint");
+
+        // subscribe
+        when(consumer.getTopicName()).thenReturn("Account");
+
+        when(consumer.getEndpoint()).thenReturn(endpoint);
+        when(endpoint.getConfiguration()).thenReturn(config);
+        when(endpoint.getComponent()).thenReturn(salesforce);
+        when(endpoint.getTopicName()).thenReturn("Account");
+
+        subscription.subscribe("Account", consumer);
+
+        // push one message so we know connection is established and consumer
+        // receives notifications
+        messages.add("[\n"
+                     + "  {\n"
+                     + "    \"data\": {\n"
+                     + "      \"event\": {\n"
+                     + "        \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n"
+                     + "        \"replayId\": 1,\n"
+                     + "        \"type\": \"created\"\n"
+                     + "      },\n"
+                     + "      \"sobject\": {\n"
+                     + "        \"Id\": \"0011n00002XWMgVAAX\",\n"
+                     + "        \"Name\": \"shouldResubscribeOnConnectionFailures 1\"\n"
+                     + "      }\n"
+                     + "    },\n"
+                     + "    \"channel\": \"/topic/Account\"\n"
+                     + "  },\n"
+                     + "  {\n"
+                     + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+                     + "    \"channel\": \"/meta/connect\",\n"
+                     + "    \"id\": \"$id\",\n"
+                     + "    \"successful\": true\n"
+                     + "  }\n"
+                     + "]");
+
+        verify(consumer, Mockito.timeout(100)).processMessage(any(ClientSessionChannel.class),
+                messageForAccountCreationWithName("shouldResubscribeOnConnectionFailures 1"));
+
+        // terminate server abruptly by closing the connection (sends FIN, ACK)
+        server.abruptlyRestart();
+
+        // queue next message for when the client recovers
+        messages.add("[\n"
+                     + "  {\n"
+                     + "    \"data\": {\n"
+                     + "      \"event\": {\n"
+                     + "        \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n"
+                     + "        \"replayId\": 2,\n"
+                     + "        \"type\": \"created\"\n"
+                     + "      },\n"
+                     + "      \"sobject\": {\n"
+                     + "        \"Id\": \"0011n00002XWMgVAAX\",\n"
+                     + "        \"Name\": \"shouldResubscribeOnConnectionFailures 2\"\n"
+                     + "      }\n"
+                     + "    },\n"
+                     + "    \"channel\": \"/topic/Account\"\n"
+                     + "  },\n"
+                     + "  {\n"
+                     + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+                     + "    \"channel\": \"/meta/connect\",\n"
+                     + "    \"id\": \"$id\",\n"
+                     + "    \"successful\": true\n"
+                     + "  }\n"
+                     + "]");
+
+        // assert last message was received, recovery can take a bit
+        verify(consumer, timeout(10000)).processMessage(any(ClientSessionChannel.class),
+                messageForAccountCreationWithName("shouldResubscribeOnConnectionFailures 2"));
+
+        verify(consumer, atLeastOnce()).getEndpoint();
+        verify(consumer, atLeastOnce()).getTopicName();
+        verifyNoMoreInteractions(consumer);
+    }
+
+    @Test
+    void shouldResubscribeOnHelperRestart() {
+        // handshake and connect
+        subscription.start();
+
+        final SalesforceConsumer consumer
+                = toUnsubscribe = mock(SalesforceConsumer.class, "shouldResubscribeOnHelperRestart:consumer");
+
+        final SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class, "shouldResubscribeOnHelperRestart:endpoint");
+
+        // subscribe
+        when(consumer.getTopicName()).thenReturn("Account");
+
+        when(consumer.getEndpoint()).thenReturn(endpoint);
+        when(endpoint.getConfiguration()).thenReturn(config);
+        when(endpoint.getComponent()).thenReturn(salesforce);
+        when(endpoint.getTopicName()).thenReturn("Account");
+
+        subscription.subscribe("Account", consumer);
+
+        // push one message so we know connection is established and consumer
+        // receives notifications
+        messages.add("[\n"
+                     + "  {\n"
+                     + "    \"data\": {\n"
+                     + "      \"event\": {\n"
+                     + "        \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n"
+                     + "        \"replayId\": 1,\n"
+                     + "        \"type\": \"created\"\n"
+                     + "      },\n"
+                     + "      \"sobject\": {\n"
+                     + "        \"Id\": \"0011n00002XWMgVAAX\",\n"
+                     + "        \"Name\": \"shouldResubscribeOnHelperRestart 1\"\n"
+                     + "      }\n"
+                     + "    },\n"
+                     + "    \"channel\": \"/topic/Account\"\n"
+                     + "  },\n"
+                     + "  {\n"
+                     + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+                     + "    \"channel\": \"/meta/connect\",\n"
+                     + "    \"id\": \"$id\",\n"
+                     + "    \"successful\": true\n"
+                     + "  }\n"
+                     + "]");
+        verify(consumer, timeout(100)).processMessage(any(ClientSessionChannel.class),
+                messageForAccountCreationWithName("shouldResubscribeOnHelperRestart 1"));
+
+        // stop and start the subscription helper
+        subscription.stop();
+        subscription.start();
+
+        // queue next message for when the client recovers
+        messages.add("[\n"
+                     + "  {\n"
+                     + "    \"data\": {\n"
+                     + "      \"event\": {\n"
+                     + "        \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n"
+                     + "        \"replayId\": 2,\n"
+                     + "        \"type\": \"created\"\n"
+                     + "      },\n"
+                     + "      \"sobject\": {\n"
+                     + "        \"Id\": \"0011n00002XWMgVAAX\",\n"
+                     + "        \"Name\": \"shouldResubscribeOnHelperRestart 2\"\n"
+                     + "      }\n"
+                     + "    },\n"
+                     + "    \"channel\": \"/topic/Account\"\n"
+                     + "  },\n"
+                     + "  {\n"
+                     + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+                     + "    \"channel\": \"/meta/connect\",\n"
+                     + "    \"id\": \"$id\",\n"
+                     + "    \"successful\": true\n"
+                     + "  }\n"
+                     + "]");
+
+        // assert last message was received, recovery can take a bit
+        verify(consumer, timeout(2000)).processMessage(any(ClientSessionChannel.class),
+                messageForAccountCreationWithName("shouldResubscribeOnHelperRestart 2"));
+
+        verify(consumer, atLeastOnce()).getEndpoint();
+        verify(consumer, atLeastOnce()).getTopicName();
+        verifyNoMoreInteractions(consumer);
+    }
+}


[camel] 02/03: CAMEL-12871: release resources on stop

Posted by zr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zregvart pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 78ea9bb3d2155e6391f238e9da6da8bb826009c1
Author: Zoran Regvart <zr...@apache.org>
AuthorDate: Wed Dec 16 14:41:17 2020 +0100

    CAMEL-12871: release resources on stop
    
    When SubscriptionHelper is stopped we need to remove all listeners and
    close channels this helper is listening on.
---
 .../internal/streaming/SubscriptionHelper.java     | 29 +++++++++++++++++++---
 1 file changed, 26 insertions(+), 3 deletions(-)

diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index 625b2f2..25e363b 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -36,6 +36,7 @@ import org.apache.camel.component.salesforce.internal.SalesforceSession;
 import org.apache.camel.support.service.ServiceSupport;
 import org.cometd.bayeux.Message;
 import org.cometd.bayeux.client.ClientSessionChannel;
+import org.cometd.bayeux.client.ClientSessionChannel.MessageListener;
 import org.cometd.client.BayeuxClient;
 import org.cometd.client.BayeuxClient.State;
 import org.cometd.client.transport.ClientTransport;
@@ -320,11 +321,33 @@ public class SubscriptionHelper extends ServiceSupport {
         return exception;
     }
 
+    private void closeChannel(final String name, MessageListener listener) {
+        if (client == null) {
+            return;
+        }
+
+        final ClientSessionChannel channel = client.getChannel(name);
+        channel.removeListener(listener);
+        channel.release();
+    }
+
     @Override
     protected void doStop() throws Exception {
-        client.getChannel(META_DISCONNECT).removeListener(disconnectListener);
-        client.getChannel(META_CONNECT).removeListener(connectListener);
-        client.getChannel(META_HANDSHAKE).removeListener(handshakeListener);
+        closeChannel(META_DISCONNECT, disconnectListener);
+        closeChannel(META_CONNECT, connectListener);
+        closeChannel(META_HANDSHAKE, handshakeListener);
+
+        for (Map.Entry<SalesforceConsumer, MessageListener> entry : listenerMap.entrySet()) {
+            final SalesforceConsumer consumer = entry.getKey();
+            final String topic = consumer.getTopicName();
+
+            final MessageListener listener = entry.getValue();
+            closeChannel(getChannelName(topic), listener);
+        }
+
+        if (client == null) {
+            return;
+        }
 
         client.disconnect();
         boolean disconnected = client.waitFor(timeout, State.DISCONNECTED);