You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by zr...@apache.org on 2020/12/16 09:50:35 UTC

[camel] branch issue/CAMEL-12871 updated (27db25d -> 94a7432)

This is an automated email from the ASF dual-hosted git repository.

zregvart pushed a change to branch issue/CAMEL-12871
in repository https://gitbox.apache.org/repos/asf/camel.git.


 discard 27db25d  TEMP BACK OUT: Fiddler workaround
 discard 7808691  CAMEL-12871: release resources on stop (WIP)
 discard 21aa7102 CAMEL-12871: stub server and test
     new f1835b2  CAMEL-12871: stub server and test
     new cacb34f  CAMEL-12871: release resources on stop (WIP)
     new 94a7432  TEMP BACK OUT: Fiddler workaround

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (27db25d)
            \
             N -- N -- N   refs/heads/issue/CAMEL-12871 (94a7432)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../SalesforceHttpClientTransportOverHTTP.java     |   8 +-
 .../salesforce/internal/streaming/StubServer.java  |  34 ++-
 .../SubscriptionHelperIntegrationTest.java         | 291 +++++++++++----------
 3 files changed, 175 insertions(+), 158 deletions(-)


[camel] 02/03: CAMEL-12871: release resources on stop (WIP)

Posted by zr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zregvart pushed a commit to branch issue/CAMEL-12871
in repository https://gitbox.apache.org/repos/asf/camel.git

commit cacb34fdf3a48358125d364b1caf69cd7348c548
Author: Zoran Regvart <zr...@apache.org>
AuthorDate: Wed Dec 16 10:15:31 2020 +0100

    CAMEL-12871: release resources on stop (WIP)
    
    When SubscriptionHelper is stopped we need to remove all listeners and
    close channels this helper is listening on.
---
 .../internal/streaming/SubscriptionHelper.java     | 161 +++++++++++++--------
 1 file changed, 103 insertions(+), 58 deletions(-)

diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index 625b2f2..c97fb02 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -22,6 +22,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
 
@@ -36,6 +37,7 @@ import org.apache.camel.component.salesforce.internal.SalesforceSession;
 import org.apache.camel.support.service.ServiceSupport;
 import org.cometd.bayeux.Message;
 import org.cometd.bayeux.client.ClientSessionChannel;
+import org.cometd.bayeux.client.ClientSessionChannel.MessageListener;
 import org.cometd.client.BayeuxClient;
 import org.cometd.client.BayeuxClient.State;
 import org.cometd.client.transport.ClientTransport;
@@ -92,6 +94,7 @@ public class SubscriptionHelper extends ServiceSupport {
 
     private volatile boolean reconnecting;
     private final AtomicLong restartBackoff;
+    private final AtomicBoolean restarting = new AtomicBoolean();
 
     public SubscriptionHelper(final SalesforceComponent component) throws SalesforceException {
         this.component = component;
@@ -172,6 +175,7 @@ public class SubscriptionHelper extends ServiceSupport {
                         connectError = (String) message.get(ERROR_FIELD);
                         connectException = getFailure(message);
 
+                        client.disconnect();
                     } else if (reconnecting) {
 
                         reconnecting = false;
@@ -205,6 +209,10 @@ public class SubscriptionHelper extends ServiceSupport {
         }
         client.getChannel(META_DISCONNECT).addListener(disconnectListener);
 
+        connect();
+    }
+
+    private void connect() throws CamelException {
         // connect to Salesforce cometd endpoint
         client.handshake();
 
@@ -228,80 +236,95 @@ public class SubscriptionHelper extends ServiceSupport {
 
     // launch an async task to restart
     private void restartClient() {
+        if (!restarting.compareAndSet(false, true)) {
+            return;
+        }
 
         // launch a new restart command
         final SalesforceHttpClient httpClient = component.getConfig().getHttpClient();
         httpClient.getExecutor().execute(new Runnable() {
             @Override
             public void run() {
+                try {
+                    performClientRestart();
+                } finally {
+                    restarting.set(false);
+                }
+            }
+        });
+    }
 
-                LOG.info("Restarting on unexpected disconnect from Salesforce...");
-                boolean abort = false;
+    private void performClientRestart() {
+        if (isStoppingOrStopped()) {
+            return;
+        }
 
-                // wait for disconnect
-                LOG.debug("Waiting to disconnect...");
-                while (!client.isDisconnected()) {
-                    try {
-                        Thread.sleep(DISCONNECT_INTERVAL);
-                    } catch (InterruptedException e) {
-                        LOG.error("Aborting restart on interrupt!");
-                        abort = true;
-                    }
-                }
+        LOG.info("Restarting on unexpected disconnect from Salesforce...");
+        boolean abort = false;
+
+        // wait for disconnect
+        LOG.debug("Waiting to disconnect...");
+        while (!abort && !client.isDisconnected()) {
+            try {
+                Thread.sleep(DISCONNECT_INTERVAL);
+            } catch (InterruptedException e) {
+                LOG.error("Aborting restart on interrupt!");
+                abort = true;
+            }
 
-                if (!abort) {
+            abort = isStoppingOrStopped();
+        }
 
-                    // update restart attempt backoff
-                    final long backoff = restartBackoff.getAndAdd(backoffIncrement);
-                    if (backoff > maxBackoff) {
-                        LOG.error("Restart aborted after exceeding {} msecs backoff", maxBackoff);
-                        abort = true;
-                    } else {
+        if (!abort) {
 
-                        // pause before restart attempt
-                        LOG.debug("Pausing for {} msecs before restart attempt", backoff);
-                        try {
-                            Thread.sleep(backoff);
-                        } catch (InterruptedException e) {
-                            LOG.error("Aborting restart on interrupt!");
-                            abort = true;
-                        }
-                    }
+            // update restart attempt backoff
+            final long backoff = restartBackoff.getAndAdd(backoffIncrement);
+            if (backoff > maxBackoff) {
+                LOG.error("Restart aborted after exceeding {} msecs backoff", maxBackoff);
+                abort = true;
+            } else {
 
-                    if (!abort) {
-                        Exception lastError = new SalesforceException("Unknown error", null);
-                        try {
-                            // reset client
-                            doStop();
+                // pause before restart attempt
+                LOG.debug("Pausing for {} msecs before restart attempt", backoff);
+                try {
+                    Thread.sleep(backoff);
+                } catch (InterruptedException e) {
+                    LOG.error("Aborting restart on interrupt!");
+                    abort = true;
+                }
+            }
 
-                            // register listeners and restart
-                            doStart();
+            if (!abort) {
+                Exception lastError = new SalesforceException("Unknown error", null);
+                try {
+                    // reset client
+                    doStop();
 
-                        } catch (Exception e) {
-                            LOG.error("Error restarting: " + e.getMessage(), e);
-                            lastError = e;
-                        }
+                    // register listeners and restart
+                    doStart();
 
-                        if (client != null && client.isHandshook()) {
-                            LOG.info("Successfully restarted!");
-                            // reset backoff interval
-                            restartBackoff.set(client.getBackoffIncrement());
-                        } else {
-                            LOG.error("Failed to restart after pausing for {} msecs", backoff);
-                            if ((backoff + backoffIncrement) > maxBackoff) {
-                                // notify all consumers
-                                String abortMsg = "Aborting restart attempt due to: " + lastError.getMessage();
-                                SalesforceException ex = new SalesforceException(abortMsg, lastError);
-                                for (SalesforceConsumer consumer : listenerMap.keySet()) {
-                                    consumer.handleException(abortMsg, ex);
-                                }
-                            }
+                } catch (Exception e) {
+                    LOG.error("Error restarting: " + e.getMessage(), e);
+                    lastError = e;
+                }
+
+                if (client != null && client.isHandshook()) {
+                    LOG.info("Successfully restarted!");
+                    // reset backoff interval
+                    restartBackoff.set(client.getBackoffIncrement());
+                } else {
+                    LOG.error("Failed to restart after pausing for {} msecs", backoff);
+                    if ((backoff + backoffIncrement) > maxBackoff) {
+                        // notify all consumers
+                        String abortMsg = "Aborting restart attempt due to: " + lastError.getMessage();
+                        SalesforceException ex = new SalesforceException(abortMsg, lastError);
+                        for (SalesforceConsumer consumer : listenerMap.keySet()) {
+                            consumer.handleException(abortMsg, ex);
                         }
                     }
                 }
-
             }
-        });
+        }
     }
 
     @SuppressWarnings("unchecked")
@@ -320,11 +343,33 @@ public class SubscriptionHelper extends ServiceSupport {
         return exception;
     }
 
+    private void closeChannel(final String name, MessageListener listener) {
+        if (client == null) {
+            return;
+        }
+
+        final ClientSessionChannel channel = client.getChannel(name);
+        channel.removeListener(listener);
+        channel.release();
+    }
+
     @Override
     protected void doStop() throws Exception {
-        client.getChannel(META_DISCONNECT).removeListener(disconnectListener);
-        client.getChannel(META_CONNECT).removeListener(connectListener);
-        client.getChannel(META_HANDSHAKE).removeListener(handshakeListener);
+        closeChannel(META_DISCONNECT, disconnectListener);
+        closeChannel(META_CONNECT, connectListener);
+        closeChannel(META_HANDSHAKE, handshakeListener);
+
+        for (Map.Entry<SalesforceConsumer, MessageListener> entry : listenerMap.entrySet()) {
+            final SalesforceConsumer consumer = entry.getKey();
+            final String topic = consumer.getTopicName();
+
+            final MessageListener listener = entry.getValue();
+            closeChannel(getChannelName(topic), listener);
+        }
+
+        if (client == null) {
+            return;
+        }
 
         client.disconnect();
         boolean disconnected = client.waitFor(timeout, State.DISCONNECTED);


[camel] 01/03: CAMEL-12871: stub server and test

Posted by zr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zregvart pushed a commit to branch issue/CAMEL-12871
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f1835b20b025a4ab2709e30ea633dd05ce9aa823
Author: Zoran Regvart <zr...@apache.org>
AuthorDate: Wed Dec 16 10:49:24 2020 +0100

    CAMEL-12871: stub server and test
    
    Adds a stub server implemented in Jetty, as it is already pulled in as a
    dependency, and a integration test for the testing streaming resiliency.
    
    Two cases are added in the integration test: server closing the
    TCP connection (e.g. in a abrupt server shutdown), and restarting the
    `SubscriptionHelper` service (e.g. when route is restarted).
---
 .../salesforce/internal/streaming/StubServer.java  | 295 ++++++++++++++++
 .../SubscriptionHelperIntegrationTest.java         | 372 +++++++++++++++++++++
 2 files changed, 667 insertions(+)

diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/StubServer.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/StubServer.java
new file mode 100644
index 0000000..56bc8cf
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/StubServer.java
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.streaming;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.camel.util.IOHelper;
+import org.eclipse.jetty.io.EndPoint;
+import org.eclipse.jetty.server.AbstractNetworkConnector;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class StubServer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(StubServer.class);
+
+    private final List<StubResponse> defaultStubs = new ArrayList<>();
+
+    private final Server server;
+
+    private final List<StubResponse> stubs = new ArrayList<>();
+
+    class StubHandler extends AbstractHandler {
+
+        @Override
+        public void handle(
+                final String target, final Request baseRequest, final HttpServletRequest request,
+                final HttpServletResponse response)
+                throws IOException, ServletException {
+            final String body;
+            try (Reader bodyReader = request.getReader()) {
+                body = IOHelper.toString(bodyReader);
+            }
+
+            final StubResponse stub = stubFor(request, body);
+
+            if (stub == null) {
+                LOG.error("Stub not found for {} {}", request.getMethod(), request.getRequestURI());
+                response.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED);
+                return;
+            }
+
+            response.setStatus(stub.responseStatus);
+            response.setContentType("application/json;charset=UTF-8");
+
+            final String id = messageIdFrom(body);
+
+            try (Writer out = response.getWriter()) {
+                stub.writeTo(id, out);
+            }
+        }
+
+        private StubResponse stubFor(final HttpServletRequest request, final String body) throws IOException {
+            final List<StubResponse> allResponses = new ArrayList<>(defaultStubs);
+            allResponses.addAll(stubs);
+
+            for (final StubResponse stub : allResponses) {
+                if (stub.matches(request, body)) {
+                    return stub;
+                }
+            }
+
+            return null;
+        }
+
+    }
+
+    final class StubResponse {
+
+        private Predicate<String> requestCondition;
+
+        private final String requestMethod;
+
+        private final String requestPath;
+
+        private BlockingQueue<String> responseMessages;
+
+        private final int responseStatus;
+
+        private String responseString;
+
+        public StubResponse(final String requestMethod, final String requestPath, final int responseStatus,
+                            final Predicate<String> requestCondition,
+                            final BlockingQueue<String> responseMessages) {
+            this(requestMethod, requestPath, responseStatus, requestCondition);
+
+            this.responseMessages = responseMessages;
+        }
+
+        private StubResponse(final String requestMethod, final String requestPath, final int responseStatus) {
+            this.responseStatus = responseStatus;
+            this.requestMethod = Objects.requireNonNull(requestMethod, "requestMethod");
+            this.requestPath = Objects.requireNonNull(requestPath, "requestPath");
+        }
+
+        private StubResponse(final String requestMethod, final String requestPath, final int responseStatus,
+                             final BlockingQueue<String> responseMessages) {
+            this(requestMethod, requestPath, responseStatus);
+
+            this.responseMessages = responseMessages;
+        }
+
+        private StubResponse(final String requestMethod, final String requestPath, final int responseStatus,
+                             final Predicate<String> requestCondition) {
+            this(requestMethod, requestPath, responseStatus);
+
+            this.requestCondition = requestCondition;
+        }
+
+        private StubResponse(final String requestMethod, final String requestPath, final int responseStatus,
+                             final Predicate<String> requestCondition,
+                             final String responseString) {
+            this(requestMethod, requestPath, responseStatus, requestCondition);
+
+            this.responseString = responseString;
+        }
+
+        private StubResponse(final String requestMethod, final String requestPath, final int responseStatus,
+                             final String responseString) {
+            this(requestMethod, requestPath, responseStatus);
+            this.responseString = responseString;
+        }
+
+        @Override
+        public String toString() {
+            return requestMethod + " " + requestPath;
+        }
+
+        private boolean matches(final HttpServletRequest request, final String body) throws IOException {
+            final boolean matches = Objects.equals(requestMethod, request.getMethod())
+                    && Objects.equals(requestPath, request.getRequestURI());
+
+            if (!matches) {
+                return false;
+            }
+
+            if (requestCondition == null) {
+                return true;
+            }
+
+            return requestCondition.test(body);
+        }
+
+        private void writeTo(final String messageId, final Writer out) throws IOException {
+            if (responseString != null) {
+                out.write(responseString.replace("$id", messageId));
+                out.flush();
+                return;
+            }
+
+            if (responseMessages != null) {
+                while (true) {
+                    try {
+                        final String message = responseMessages.poll(25, TimeUnit.MILLISECONDS);
+                        if (message != null) {
+                            out.write(message.replace("$id", messageId));
+                            out.flush();
+                            return;
+                        }
+
+                        if (!server.isRunning()) {
+                            return;
+                        }
+                    } catch (final InterruptedException ignored) {
+                        return;
+                    }
+                }
+            }
+        }
+    }
+
+    public StubServer() {
+        server = new Server(0);
+        server.setHandler(new StubHandler());
+
+        try {
+            server.start();
+        } catch (final Exception e) {
+            throw new ExceptionInInitializerError(e);
+        }
+    }
+
+    @SuppressWarnings("resource")
+    public void abruptlyRestart() {
+        final int port = port();
+
+        stop();
+
+        connector().setPort(port);
+
+        try {
+            server.start();
+        } catch (final Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @SuppressWarnings("resource")
+    public int port() {
+        return connector().getLocalPort();
+    }
+
+    public void replyTo(final String method, final String path, final BlockingQueue<String> messages) {
+        stubs.add(new StubResponse(method, path, 200, messages));
+    }
+
+    public void replyTo(final String method, final String path, final int status) {
+        stubs.add(new StubResponse(method, path, status));
+    }
+
+    public void replyTo(
+            final String method, final String path, final Predicate<String> requestCondition,
+            final BlockingQueue<String> messages) {
+        stubs.add(new StubResponse(method, path, 200, requestCondition, messages));
+    }
+
+    public void replyTo(
+            final String method, final String path, final Predicate<String> requestCondition, final String response) {
+        stubs.add(new StubResponse(method, path, 200, requestCondition, response));
+    }
+
+    public void replyTo(final String method, final String path, final String response) {
+        stubs.add(new StubResponse(method, path, 200, response));
+    }
+
+    public void reset() {
+        stubs.clear();
+    }
+
+    public void stop() {
+        try {
+            for (final EndPoint endPoint : connector().getConnectedEndPoints()) {
+                endPoint.close();
+            }
+
+            server.stop();
+        } catch (final Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public void stubsAsDefaults() {
+        defaultStubs.addAll(stubs);
+        stubs.clear();
+    }
+
+    private AbstractNetworkConnector connector() {
+        final AbstractNetworkConnector connector = (AbstractNetworkConnector) server.getConnectors()[0];
+        return connector;
+    }
+
+    private static String messageIdFrom(final String body) {
+        int idx = body.indexOf("\"id\":\"");
+        String id = "";
+
+        if (idx > 0) {
+            idx += 6;
+            char ch;
+            while (Character.isDigit(ch = body.charAt(idx++))) {
+                id += ch;
+            }
+        }
+        return id;
+    }
+
+}
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java
new file mode 100644
index 0000000..64cc2d5
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.streaming;
+
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelException;
+import org.apache.camel.component.salesforce.AuthenticationType;
+import org.apache.camel.component.salesforce.SalesforceComponent;
+import org.apache.camel.component.salesforce.SalesforceConsumer;
+import org.apache.camel.component.salesforce.SalesforceEndpoint;
+import org.apache.camel.component.salesforce.SalesforceEndpointConfig;
+import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.cometd.bayeux.Message;
+import org.cometd.bayeux.client.ClientSessionChannel;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestInstance.Lifecycle;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelperIntegrationTest.MessageArgumentMatcher.messageForAccountCreationWithName;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@TestInstance(Lifecycle.PER_CLASS)
+public class SubscriptionHelperIntegrationTest {
+
+    final CamelContext camel;
+
+    final SalesforceEndpointConfig config = new SalesforceEndpointConfig();
+
+    final BlockingQueue<String> messages = new LinkedBlockingDeque<>();
+
+    final SalesforceComponent salesforce;
+
+    final StubServer server;
+
+    final SubscriptionHelper subscription;
+
+    SalesforceConsumer toUnsubscribe;
+
+    static class MessageArgumentMatcher implements ArgumentMatcher<Message> {
+
+        private final String name;
+
+        public MessageArgumentMatcher(final String name) {
+            this.name = name;
+        }
+
+        @Override
+        public boolean matches(final Message message) {
+            final Map<String, Object> data = message.getDataAsMap();
+
+            @SuppressWarnings("unchecked")
+            final Map<String, Object> event = (Map<String, Object>) data.get("event");
+
+            @SuppressWarnings("unchecked")
+            final Map<String, Object> sobject = (Map<String, Object>) data.get("sobject");
+
+            return "created".equals(event.get("type")) && name.equals(sobject.get("Name"));
+        }
+
+        static Message messageForAccountCreationWithName(final String name) {
+            return argThat(new MessageArgumentMatcher(name));
+        }
+
+    }
+
+    public SubscriptionHelperIntegrationTest() throws SalesforceException {
+        server = new StubServer();
+
+        LoggerFactory.getLogger(SubscriptionHelperIntegrationTest.class).info("Port for wireshark to filter: {}",
+                server.port());
+
+        final String instanceUrl = "http://localhost:" + server.port();
+
+        server.replyTo("POST", "/services/oauth2/token",
+                "{\"instance_url\":\"" + instanceUrl + "\",\"access_token\":\"token\"}");
+
+        server.replyTo("GET", "/services/oauth2/revoke?token=token", 200);
+
+        server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/handshake", "[\n"
+                                                                                                     + "  {\n"
+                                                                                                     + "    \"ext\": {\n"
+                                                                                                     + "      \"replay\": true,\n"
+                                                                                                     + "      \"payload.format\": true\n"
+                                                                                                     + "    },\n"
+                                                                                                     + "    \"minimumVersion\": \"1.0\",\n"
+                                                                                                     + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+                                                                                                     + "    \"supportedConnectionTypes\": [\n"
+                                                                                                     + "      \"long-polling\"\n"
+                                                                                                     + "    ],\n"
+                                                                                                     + "    \"channel\": \"/meta/handshake\",\n"
+                                                                                                     + "    \"id\": \"$id\",\n"
+                                                                                                     + "    \"version\": \"1.0\",\n"
+                                                                                                     + "    \"successful\": true\n"
+                                                                                                     + "  }\n"
+                                                                                                     + "]");
+
+        server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/connect",
+                (req) -> req.contains("\"timeout\":0"), "[\n"
+                                                        + "  {\n"
+                                                        + "    \"clientId\": \"1f0agp5a95yiaeb1kifib37r5z4g\",\n"
+                                                        + "    \"advice\": {\n"
+                                                        + "      \"interval\": 0,\n"
+                                                        + "      \"timeout\": 110000,\n"
+                                                        + "      \"reconnect\": \"retry\"\n"
+                                                        + "    },\n"
+                                                        + "    \"channel\": \"/meta/connect\",\n"
+                                                        + "    \"id\": \"$id\",\n"
+                                                        + "    \"successful\": true\n"
+                                                        + "  }\n"
+                                                        + "]");
+
+        server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/connect", messages);
+
+        server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/subscribe", "[\n"
+                                                                                                     + "  {\n"
+                                                                                                     + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+                                                                                                     + "    \"channel\": \"/meta/subscribe\",\n"
+                                                                                                     + "    \"id\": \"$id\",\n"
+                                                                                                     + "    \"subscription\": \"/topic/Account\",\n"
+                                                                                                     + "    \"successful\": true\n"
+                                                                                                     + "  }\n"
+                                                                                                     + "]");
+
+        server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/unsubscribe", "[\n"
+                                                                                                       + "  {\n"
+                                                                                                       + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+                                                                                                       + "    \"channel\": \"/meta/unsubscribe\",\n"
+                                                                                                       + "    \"id\": \"$id\",\n"
+                                                                                                       + "    \"subscription\": \"/topic/Account\",\n"
+                                                                                                       + "    \"successful\": true\n"
+                                                                                                       + "  }\n"
+                                                                                                       + "]");
+
+        server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/disconnect", "[\n"
+                                                                                                      + "  {\n"
+                                                                                                      + "     \"channel\": \"/meta/disconnect\",\n"
+                                                                                                      + "     \"clientId\": \"client-id\"\n"
+                                                                                                      + "   }\n"
+                                                                                                      + "]");
+
+        server.replyTo("GET", "/services/oauth2/revoke", 200);
+
+        server.stubsAsDefaults();
+
+        camel = new DefaultCamelContext();
+        camel.start();
+        salesforce = new SalesforceComponent(camel);
+        salesforce.setLoginUrl(instanceUrl);
+        salesforce.setClientId("clientId");
+        salesforce.setClientSecret("clientSecret");
+        salesforce.setRefreshToken("refreshToken");
+        salesforce.setAuthenticationType(AuthenticationType.REFRESH_TOKEN);
+        salesforce.setConfig(config);
+
+        salesforce.start();
+        subscription = new SubscriptionHelper(salesforce);
+    }
+
+    @BeforeEach
+    public void cleanSlate() throws CamelException {
+        if (toUnsubscribe != null) {
+            subscription.unsubscribe("Account", toUnsubscribe);
+        }
+        server.reset();
+    }
+
+    @AfterAll
+    public void stop() {
+        salesforce.stop();
+        camel.stop();
+        server.stop();
+    }
+
+    @Test
+    void shouldResubscribeOnConnectionFailures() throws InterruptedException {
+        // handshake and connect
+        subscription.start();
+
+        final SalesforceConsumer consumer
+                = toUnsubscribe = mock(SalesforceConsumer.class, "shouldResubscribeOnConnectionFailures:consumer");
+
+        final SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class, "shouldResubscribeOnConnectionFailures:endpoint");
+
+        // subscribe
+        when(consumer.getTopicName()).thenReturn("Account");
+
+        when(consumer.getEndpoint()).thenReturn(endpoint);
+        when(endpoint.getConfiguration()).thenReturn(config);
+        when(endpoint.getComponent()).thenReturn(salesforce);
+        when(endpoint.getTopicName()).thenReturn("Account");
+
+        subscription.subscribe("Account", consumer);
+
+        // push one message so we know connection is established and consumer
+        // receives notifications
+        messages.add("[\n"
+                     + "  {\n"
+                     + "    \"data\": {\n"
+                     + "      \"event\": {\n"
+                     + "        \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n"
+                     + "        \"replayId\": 1,\n"
+                     + "        \"type\": \"created\"\n"
+                     + "      },\n"
+                     + "      \"sobject\": {\n"
+                     + "        \"Id\": \"0011n00002XWMgVAAX\",\n"
+                     + "        \"Name\": \"shouldResubscribeOnConnectionFailures 1\"\n"
+                     + "      }\n"
+                     + "    },\n"
+                     + "    \"channel\": \"/topic/Account\"\n"
+                     + "  },\n"
+                     + "  {\n"
+                     + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+                     + "    \"channel\": \"/meta/connect\",\n"
+                     + "    \"id\": \"$id\",\n"
+                     + "    \"successful\": true\n"
+                     + "  }\n"
+                     + "]");
+
+        verify(consumer, Mockito.timeout(100)).processMessage(any(ClientSessionChannel.class),
+                messageForAccountCreationWithName("shouldResubscribeOnConnectionFailures 1"));
+
+        // terminate server abruptly by closing the connection (sends FIN, ACK)
+        server.abruptlyRestart();
+
+        // queue next message for when the client recovers
+        messages.add("[\n"
+                     + "  {\n"
+                     + "    \"data\": {\n"
+                     + "      \"event\": {\n"
+                     + "        \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n"
+                     + "        \"replayId\": 2,\n"
+                     + "        \"type\": \"created\"\n"
+                     + "      },\n"
+                     + "      \"sobject\": {\n"
+                     + "        \"Id\": \"0011n00002XWMgVAAX\",\n"
+                     + "        \"Name\": \"shouldResubscribeOnConnectionFailures 2\"\n"
+                     + "      }\n"
+                     + "    },\n"
+                     + "    \"channel\": \"/topic/Account\"\n"
+                     + "  },\n"
+                     + "  {\n"
+                     + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+                     + "    \"channel\": \"/meta/connect\",\n"
+                     + "    \"id\": \"$id\",\n"
+                     + "    \"successful\": true\n"
+                     + "  }\n"
+                     + "]");
+
+        // assert last message was received, recovery can take a bit
+        verify(consumer, timeout(10000)).processMessage(any(ClientSessionChannel.class),
+                messageForAccountCreationWithName("shouldResubscribeOnConnectionFailures 2"));
+
+        verify(consumer, atLeastOnce()).getEndpoint();
+        verify(consumer, atLeastOnce()).getTopicName();
+        verifyNoMoreInteractions(consumer);
+    }
+
+    @Test
+    void shouldResubscribeOnHelperRestart() {
+        // handshake and connect
+        subscription.start();
+
+        final SalesforceConsumer consumer
+                = toUnsubscribe = mock(SalesforceConsumer.class, "shouldResubscribeOnHelperRestart:consumer");
+
+        final SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class, "shouldResubscribeOnHelperRestart:endpoint");
+
+        // subscribe
+        when(consumer.getTopicName()).thenReturn("Account");
+
+        when(consumer.getEndpoint()).thenReturn(endpoint);
+        when(endpoint.getConfiguration()).thenReturn(config);
+        when(endpoint.getComponent()).thenReturn(salesforce);
+        when(endpoint.getTopicName()).thenReturn("Account");
+
+        subscription.subscribe("Account", consumer);
+
+        // push one message so we know connection is established and consumer
+        // receives notifications
+        messages.add("[\n"
+                     + "  {\n"
+                     + "    \"data\": {\n"
+                     + "      \"event\": {\n"
+                     + "        \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n"
+                     + "        \"replayId\": 1,\n"
+                     + "        \"type\": \"created\"\n"
+                     + "      },\n"
+                     + "      \"sobject\": {\n"
+                     + "        \"Id\": \"0011n00002XWMgVAAX\",\n"
+                     + "        \"Name\": \"shouldResubscribeOnHelperRestart 1\"\n"
+                     + "      }\n"
+                     + "    },\n"
+                     + "    \"channel\": \"/topic/Account\"\n"
+                     + "  },\n"
+                     + "  {\n"
+                     + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+                     + "    \"channel\": \"/meta/connect\",\n"
+                     + "    \"id\": \"$id\",\n"
+                     + "    \"successful\": true\n"
+                     + "  }\n"
+                     + "]");
+        verify(consumer, timeout(100)).processMessage(any(ClientSessionChannel.class),
+                messageForAccountCreationWithName("shouldResubscribeOnHelperRestart 1"));
+
+        // stop and start the subscription helper
+        subscription.stop();
+        subscription.start();
+
+        // queue next message for when the client recovers
+        messages.add("[\n"
+                     + "  {\n"
+                     + "    \"data\": {\n"
+                     + "      \"event\": {\n"
+                     + "        \"createdDate\": \"2020-12-11T13:44:56.891Z\",\n"
+                     + "        \"replayId\": 2,\n"
+                     + "        \"type\": \"created\"\n"
+                     + "      },\n"
+                     + "      \"sobject\": {\n"
+                     + "        \"Id\": \"0011n00002XWMgVAAX\",\n"
+                     + "        \"Name\": \"shouldResubscribeOnHelperRestart 2\"\n"
+                     + "      }\n"
+                     + "    },\n"
+                     + "    \"channel\": \"/topic/Account\"\n"
+                     + "  },\n"
+                     + "  {\n"
+                     + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+                     + "    \"channel\": \"/meta/connect\",\n"
+                     + "    \"id\": \"$id\",\n"
+                     + "    \"successful\": true\n"
+                     + "  }\n"
+                     + "]");
+
+        // assert last message was received, recovery can take a bit
+        verify(consumer, timeout(2000)).processMessage(any(ClientSessionChannel.class),
+                messageForAccountCreationWithName("shouldResubscribeOnHelperRestart 2"));
+
+        verify(consumer, atLeastOnce()).getEndpoint();
+        verify(consumer, atLeastOnce()).getTopicName();
+        verifyNoMoreInteractions(consumer);
+    }
+}


[camel] 03/03: TEMP BACK OUT: Fiddler workaround

Posted by zr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zregvart pushed a commit to branch issue/CAMEL-12871
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 94a7432256c71979a77f75ba99a3c123429a021a
Author: Zoran Regvart <zr...@apache.org>
AuthorDate: Mon Dec 14 13:52:20 2020 +0100

    TEMP BACK OUT: Fiddler workaround
    
    This allows proxying HTTPS via HTTP proxy tunneling (`CONNECT`) with
    Fiddler. Fiddler is sending `Connection: close` which closes the tunnel
    and abruptly terminates the proxy tunnel connection. This removes the
    `Connection: close` header in that case.
    
    This change needs to be backed out, and not merged to mainline.
---
 .../component/salesforce/SalesforceHttpClient.java |  3 +-
 .../SalesforceHttpClientTransportOverHTTP.java     | 79 ++++++++++++++++++++++
 2 files changed, 80 insertions(+), 2 deletions(-)

diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java
index 0fce4bb..293d4c2 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java
@@ -30,7 +30,6 @@ import org.eclipse.jetty.client.HttpConversation;
 import org.eclipse.jetty.client.HttpRequest;
 import org.eclipse.jetty.client.ProtocolHandler;
 import org.eclipse.jetty.client.api.Request;
-import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
 
 import static java.util.Optional.ofNullable;
@@ -64,7 +63,7 @@ public class SalesforceHttpClient extends HttpClient {
     }
 
     public SalesforceHttpClient(HttpClientTransport transport, SslContextFactory sslContextFactory) {
-        super(ofNullable(transport).orElse(new HttpClientTransportOverHTTP()), sslContextFactory);
+        super(ofNullable(transport).orElse(new SalesforceHttpClientTransportOverHTTP()), sslContextFactory);
 
         // Jetty 9.3, as opposed to 9.2 the way to add ProtocolHandler to
         // HttpClient changed in 9.2 HttpClient::getProtocolHandlers returned
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClientTransportOverHTTP.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClientTransportOverHTTP.java
new file mode 100644
index 0000000..336ffcd
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClientTransportOverHTTP.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce;
+
+import org.eclipse.jetty.client.HttpDestination;
+import org.eclipse.jetty.client.HttpExchange;
+import org.eclipse.jetty.client.HttpRequest;
+import org.eclipse.jetty.client.api.Connection;
+import org.eclipse.jetty.client.api.Response;
+import org.eclipse.jetty.client.api.Result;
+import org.eclipse.jetty.client.http.HttpChannelOverHTTP;
+import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
+import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
+import org.eclipse.jetty.http.HttpFields;
+import org.eclipse.jetty.http.HttpHeader;
+import org.eclipse.jetty.http.HttpMethod;
+import org.eclipse.jetty.http.HttpVersion;
+import org.eclipse.jetty.io.EndPoint;
+import org.eclipse.jetty.util.Promise;
+
+/**
+ * A workaround for Fiddler sending <code>Connection: close</code> on HTTPS proxy tunneling via CONNECT.
+ */
+public class SalesforceHttpClientTransportOverHTTP extends HttpClientTransportOverHTTP {
+
+    public class SalesforceHttpChannelOverHTTP extends HttpChannelOverHTTP {
+
+        public SalesforceHttpChannelOverHTTP(final HttpConnectionOverHTTP connection) {
+            super(connection);
+        }
+
+        @Override
+        public void exchangeTerminated(final HttpExchange exchange, final Result result) {
+            final Response response = result.getResponse();
+            final HttpRequest request = exchange.getRequest();
+
+            if (response != null && response.getVersion() != null && response.getVersion().compareTo(HttpVersion.HTTP_1_1) >= 0
+                    && request != null && HttpMethod.CONNECT.is(request.getMethod())) {
+                final HttpFields headers = response.getHeaders();
+                headers.remove(HttpHeader.CONNECTION);
+            }
+
+            super.exchangeTerminated(exchange, result);
+        }
+    }
+
+    public class SalesforceHttpConnectionOverHTTP extends HttpConnectionOverHTTP {
+
+        public SalesforceHttpConnectionOverHTTP(final EndPoint endPoint, final HttpDestination destination,
+                                                final Promise<Connection> promise) {
+            super(endPoint, destination, promise);
+        }
+
+        @Override
+        protected HttpChannelOverHTTP newHttpChannel() {
+            return new SalesforceHttpChannelOverHTTP(this);
+        }
+    }
+
+    @Override
+    protected HttpConnectionOverHTTP newHttpConnection(
+            final EndPoint endPoint, final HttpDestination destination, final Promise<Connection> promise) {
+        return new SalesforceHttpConnectionOverHTTP(endPoint, destination, promise);
+    }
+}