You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by dh...@apache.org on 2016/08/25 15:56:58 UTC

[1/2] camel git commit: CAMEL-10238: Restore Cookies and HOST headers in securityhandler for subscriptions

Repository: camel
Updated Branches:
  refs/heads/camel-2.17.x ac501457d -> 36f857888


CAMEL-10238: Restore Cookies and HOST headers in securityhandler for subscriptions


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/59b67d2d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/59b67d2d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/59b67d2d

Branch: refs/heads/camel-2.17.x
Commit: 59b67d2d3b56da9fde7e283255b524bad57c3e99
Parents: ac50145
Author: Dhiraj Bokde <dh...@yahoo.com>
Authored: Thu Aug 25 00:28:58 2016 -0700
Committer: Dhiraj Bokde <dh...@yahoo.com>
Committed: Thu Aug 25 08:47:46 2016 -0700

----------------------------------------------------------------------
 .../client/SalesforceSecurityHandler.java       | 36 ++++++++++++++------
 1 file changed, 25 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/59b67d2d/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityHandler.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityHandler.java
index 8df28de..11c8cbe 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityHandler.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityHandler.java
@@ -30,6 +30,8 @@ import org.eclipse.jetty.client.api.Request;
 import org.eclipse.jetty.client.api.Response;
 import org.eclipse.jetty.client.api.Result;
 import org.eclipse.jetty.client.util.BufferingResponseListener;
+import org.eclipse.jetty.http.HttpField;
+import org.eclipse.jetty.http.HttpFields;
 import org.eclipse.jetty.http.HttpHeader;
 import org.eclipse.jetty.http.HttpStatus;
 import org.slf4j.Logger;
@@ -208,24 +210,36 @@ public class SalesforceSecurityHandler implements ProtocolHandler {
             if (copy) {
                 newRequest = httpClient.copyRequest(request, request.getURI());
                 newRequest.method(request.getMethod());
+                HttpFields headers = newRequest.getHeaders();
+                // copy cookies and host for subscriptions to avoid '403::Unknown Client' errors
+                for (HttpField field : request.getHeaders()) {
+                    HttpHeader header = field.getHeader();
+                    if (HttpHeader.COOKIE.equals(header) || HttpHeader.HOST.equals(header)) {
+                        headers.add(header, field.getValue());
+                    }
+                }
             } else {
                 newRequest = request;
             }
 
             conversation.setAttribute(AUTHENTICATION_RETRIES_ATTRIBUTE, ++retries);
 
-            LOG.debug("Retry attempt {} on authentication error for {}", retries, request);
+            Object originalRequest = conversation.getAttribute(AUTHENTICATION_REQUEST_ATTRIBUTE);
+            LOG.debug("Retry attempt {} on authentication error for {}", retries, originalRequest != null ? originalRequest : newRequest);
 
-            // update currentToken
-            String currentToken = session.getAccessToken();
-            if (client != null) {
-                // update client cache for this and future requests
-                client.setAccessToken(currentToken);
-                client.setInstanceUrl(session.getInstanceUrl());
-                client.setAccessToken(newRequest);
-            } else {
-                // plain request not made by an AbstractClientBase
-                newRequest.header(HttpHeader.AUTHORIZATION, "OAuth " + currentToken);
+            // update currentToken for original request
+            if (originalRequest == null) {
+
+                String currentToken = session.getAccessToken();
+                if (client != null) {
+                    // update client cache for this and future requests
+                    client.setAccessToken(currentToken);
+                    client.setInstanceUrl(session.getInstanceUrl());
+                    client.setAccessToken(newRequest);
+                } else {
+                    // plain request not made by an AbstractClientBase
+                    newRequest.header(HttpHeader.AUTHORIZATION, "OAuth " + currentToken);
+                }
             }
 
             // send new async request with a new delegate


[2/2] camel git commit: CAMEL-10238: Updated subscription helper to listen for hard disconnects, and reconnect from scratch, also updated error handling throwing SalesforceException from consumer endpoints

Posted by dh...@apache.org.
CAMEL-10238: Updated subscription helper to listen for hard disconnects, and reconnect from scratch, also updated error handling throwing SalesforceException from consumer endpoints

Conflicts:
	components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/36f85788
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/36f85788
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/36f85788

Branch: refs/heads/camel-2.17.x
Commit: 36f857888bbfc07bdcc2908d3bcbd5df01aaa03f
Parents: 59b67d2
Author: Dhiraj Bokde <dh...@yahoo.com>
Authored: Thu Aug 25 01:20:07 2016 -0700
Committer: Dhiraj Bokde <dh...@yahoo.com>
Committed: Thu Aug 25 08:56:42 2016 -0700

----------------------------------------------------------------------
 .../salesforce/SalesforceConsumer.java          |   9 +-
 .../internal/streaming/SubscriptionHelper.java  | 237 +++++++++++++------
 2 files changed, 172 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/36f85788/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
index e6434a5..e5c3075 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
@@ -26,6 +26,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.component.salesforce.internal.PayloadFormat;
+import org.apache.camel.component.salesforce.api.SalesforceException;
 import org.apache.camel.component.salesforce.internal.client.DefaultRestClient;
 import org.apache.camel.component.salesforce.internal.client.RestClient;
 import org.apache.camel.component.salesforce.internal.streaming.PushTopicHelper;
@@ -172,7 +173,7 @@ public class SalesforceConsumer extends DefaultConsumer {
         } catch (IOException e) {
             final String msg = String.format("Error parsing message [%s] from Topic %s: %s",
                     message, topicName, e.getMessage());
-            handleException(msg, new RuntimeCamelException(msg, e));
+            handleException(msg, new SalesforceException(msg, e));
         }
 
         try {
@@ -186,11 +187,13 @@ public class SalesforceConsumer extends DefaultConsumer {
                 }
             });
         } catch (Exception e) {
-            handleException(String.format("Error processing %s: %s", exchange, e.getMessage()), e);
+            String msg = String.format("Error processing %s: %s", exchange, e);
+            handleException(msg, new SalesforceException(msg, e));
         } finally {
             Exception ex = exchange.getException();
             if (ex != null) {
-                handleException(String.format("Unhandled exception: %s", ex.getMessage()), ex);
+                String msg = String.format("Unhandled exception: %s", ex.getMessage());
+                handleException(msg, new SalesforceException(msg, ex));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/36f85788/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index 1cc4a21..1e3e8a4 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -28,6 +28,7 @@ import org.apache.camel.CamelException;
 import org.apache.camel.component.salesforce.SalesforceComponent;
 import org.apache.camel.component.salesforce.SalesforceConsumer;
 import org.apache.camel.component.salesforce.SalesforceHttpClient;
+import org.apache.camel.component.salesforce.api.SalesforceException;
 import org.apache.camel.component.salesforce.internal.SalesforceSession;
 import org.apache.camel.support.ServiceSupport;
 import org.cometd.bayeux.Message;
@@ -41,6 +42,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.cometd.bayeux.Channel.META_CONNECT;
+import static org.cometd.bayeux.Channel.META_DISCONNECT;
 import static org.cometd.bayeux.Channel.META_HANDSHAKE;
 import static org.cometd.bayeux.Channel.META_SUBSCRIBE;
 import static org.cometd.bayeux.Channel.META_UNSUBSCRIBE;
@@ -54,7 +56,9 @@ public class SubscriptionHelper extends ServiceSupport {
     private static final int CONNECT_TIMEOUT = 110;
     private static final int CHANNEL_TIMEOUT = 40;
 
+    private static final String FAILURE_FIELD = "failure";
     private static final String EXCEPTION_FIELD = "exception";
+    private static final int DISCONNECT_INTERVAL = 5000;
 
     private final SalesforceComponent component;
     private final SalesforceSession session;
@@ -65,11 +69,14 @@ public class SubscriptionHelper extends ServiceSupport {
 
     private ClientSessionChannel.MessageListener handshakeListener;
     private ClientSessionChannel.MessageListener connectListener;
+    private ClientSessionChannel.MessageListener disconnectListener;
 
-    private String handshakeError;
-    private Exception handshakeException;
-    private String connectError;
-    private boolean reconnecting;
+    private volatile String handshakeError;
+    private volatile Exception handshakeException;
+    private volatile String connectError;
+    private volatile Exception connectException;
+
+    private volatile boolean reconnecting;
 
     public SubscriptionHelper(SalesforceComponent component) throws Exception {
         this.component = component;
@@ -83,6 +90,13 @@ public class SubscriptionHelper extends ServiceSupport {
 
     @Override
     protected void doStart() throws Exception {
+
+        // reset all error conditions
+        handshakeError = null;
+        handshakeException = null;
+        connectError = null;
+        connectException = null;
+
         // listener for handshake error or exception
         if (handshakeListener == null) {
             // first start
@@ -91,14 +105,9 @@ public class SubscriptionHelper extends ServiceSupport {
                     LOG.debug("[CHANNEL:META_HANDSHAKE]: {}", message);
 
                     if (!message.isSuccessful()) {
-                        String error = (String) message.get(ERROR_FIELD);
-                        if (error != null) {
-                            handshakeError = error;
-                        }
-                        Exception exception = (Exception) message.get(EXCEPTION_FIELD);
-                        if (exception != null) {
-                            handshakeException = exception;
-                        }
+                        LOG.warn("Handshake failure: {}", message);
+                        handshakeError = (String) message.get(ERROR_FIELD);
+                        handshakeException = getFailure(message);
                     } else if (!listenerMap.isEmpty()) {
                         reconnecting = true;
                     }
@@ -114,10 +123,22 @@ public class SubscriptionHelper extends ServiceSupport {
                     LOG.debug("[CHANNEL:META_CONNECT]: {}", message);
 
                     if (!message.isSuccessful()) {
-                        String error = (String) message.get(ERROR_FIELD);
-                        if (error != null) {
-                            connectError = error;
+
+                        LOG.warn("Connect failure: {}", message);
+                        connectError = (String) message.get(ERROR_FIELD);
+                        connectException = getFailure(message);
+
+                        if (connectError != null) {
+                            // refresh oauth token, if it's a 401 error
+                            if (connectError.startsWith("401::")) {
+                                try {
+                                    session.login(null);
+                                } catch (SalesforceException e) {
+                                    LOG.error("Error renewing OAuth token on Connect 401: {} ", e.getMessage(), e);
+                                }
+                            }
                         }
+
                     } else if (reconnecting) {
 
                         reconnecting = false;
@@ -131,15 +152,7 @@ public class SubscriptionHelper extends ServiceSupport {
                         for (Map.Entry<SalesforceConsumer, ClientSessionChannel.MessageListener> entry : map.entrySet()) {
                             final SalesforceConsumer consumer = entry.getKey();
                             final String topicName = consumer.getTopicName();
-                            try {
-                                subscribe(topicName, consumer);
-                            } catch (CamelException e) {
-                                // let the consumer handle the exception
-                                consumer.handleException(
-                                        String.format("Error refreshing subscription to topic [%s]: %s",
-                                                topicName, e.getMessage()),
-                                        e);
-                            }
+                            subscribe(topicName, consumer);
                         }
 
                     }
@@ -148,6 +161,84 @@ public class SubscriptionHelper extends ServiceSupport {
         }
         client.getChannel(META_CONNECT).addListener(connectListener);
 
+        // handle fatal disconnects by reconnecting asynchronously
+        if (disconnectListener == null) {
+            disconnectListener = new ClientSessionChannel.MessageListener() {
+                @Override
+                public void onMessage(ClientSessionChannel clientSessionChannel, Message message) {
+
+                    // launch an async task to reconnect
+                    final SalesforceHttpClient httpClient = component.getConfig().getHttpClient();
+
+                    httpClient.getExecutor().execute(new Runnable() {
+                        @Override
+                        public void run() {
+
+                            boolean abort = false;
+                            // wait for disconnect
+                            while (!client.isDisconnected()) {
+                                try {
+                                    Thread.sleep(DISCONNECT_INTERVAL);
+                                } catch (InterruptedException e) {
+                                    LOG.error("Aborting reconnect on interrupt!");
+                                    abort = true;
+                                }
+                            }
+
+                            if (!abort) {
+
+                                LOG.info("Reconnecting on unexpected disconnect from Salesforce...");
+                                final long backoffIncrement = client.getBackoffIncrement();
+                                final long maxBackoff = client.getMaxBackoff();
+
+                                long backoff = backoffIncrement;
+                                String msg = String.format("Failed to reconnect, exceeded maximum backoff %s msecs", maxBackoff);
+                                Exception lastError = new SalesforceException(msg, null);
+
+                                // retry until interrupted, or handshook or connect backoff exceeded
+                                while (!abort && !client.isHandshook() && backoff < maxBackoff) {
+
+                                    try {
+                                        // reset client
+                                        doStop();
+
+                                        // register listeners and restart
+                                        doStart();
+
+                                    } catch (Exception e) {
+                                        LOG.error("Error reconnecting to Salesforce: {}", e.getMessage(), e);
+                                        lastError = e;
+                                    }
+
+                                    if (!client.isHandshook()) {
+                                        LOG.debug("Pausing for {} msecs after reconnect failure", backoff);
+                                        try {
+                                            Thread.sleep(backoff);
+                                        } catch (InterruptedException e) {
+                                            LOG.error("Aborting reconnect on interrupt!");
+                                            abort = true;
+                                        }
+                                        backoff += backoffIncrement;
+                                    }
+                                }
+
+                                if (client.isHandshook()) {
+                                    LOG.info("Successfully reconnected to Salesforce!");
+                                } else if (!abort) {
+                                    // notify all consumers
+                                    String abortMsg = "Aborting Salesforce reconnect due to: " + lastError.getMessage();
+                                    for (SalesforceConsumer consumer : listenerMap.keySet()) {
+                                        consumer.handleException(abortMsg, new SalesforceException(abortMsg, lastError));
+                                    }
+                                }
+                            }
+                        }
+                    });
+                }
+            };
+        }
+        client.getChannel(META_DISCONNECT).addListener(disconnectListener);
+
         // connect to Salesforce cometd endpoint
         client.handshake();
 
@@ -159,6 +250,10 @@ public class SubscriptionHelper extends ServiceSupport {
                         handshakeException);
             } else if (handshakeError != null) {
                 throw new CamelException(String.format("Error during HANDSHAKE: %s", handshakeError));
+            } else if (connectException != null) {
+                throw new CamelException(
+                        String.format("Exception during CONNECT: %s", connectException.getMessage()),
+                        connectException);
             } else if (connectError != null) {
                 throw new CamelException(String.format("Error during CONNECT: %s", connectError));
             } else {
@@ -168,8 +263,20 @@ public class SubscriptionHelper extends ServiceSupport {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    private Exception getFailure(Message message) {
+        Exception exception = null;
+        if (message.get(EXCEPTION_FIELD) != null) {
+            exception = (Exception) message.get(EXCEPTION_FIELD);
+        } else if (message.get(FAILURE_FIELD) != null) {
+            exception = (Exception) ((Map<String, Object>)message.get("failure")).get("exception");
+        }
+        return exception;
+    }
+
     @Override
     protected void doStop() throws Exception {
+        client.getChannel(META_DISCONNECT).removeListener(disconnectListener);
         client.getChannel(META_CONNECT).removeListener(connectListener);
         client.getChannel(META_HANDSHAKE).removeListener(handshakeListener);
 
@@ -198,7 +305,8 @@ public class SubscriptionHelper extends ServiceSupport {
                 super.customize(request);
 
                 // add current security token obtained from session
-                request.header(HttpHeader.AUTHORIZATION, "OAuth " + session.getAccessToken());
+                // replace old token
+                request.getHeaders().put(HttpHeader.AUTHORIZATION, "OAuth " + session.getAccessToken());
             }
         };
 
@@ -206,7 +314,7 @@ public class SubscriptionHelper extends ServiceSupport {
         return client;
     }
 
-    public void subscribe(final String topicName, final SalesforceConsumer consumer) throws CamelException {
+    public void subscribe(final String topicName, final SalesforceConsumer consumer) {
         // create subscription for consumer
         final String channelName = getChannelName(topicName);
 
@@ -225,9 +333,7 @@ public class SubscriptionHelper extends ServiceSupport {
 
         final ClientSessionChannel clientChannel = client.getChannel(channelName);
 
-        // listener for subscribe error
-        final CountDownLatch latch = new CountDownLatch(1);
-        final String[] subscribeError = {null};
+        // listener for subscription
         final ClientSessionChannel.MessageListener subscriptionListener = new ClientSessionChannel.MessageListener() {
             public void onMessage(ClientSessionChannel channel, Message message) {
                 LOG.debug("[CHANNEL:META_SUBSCRIBE]: {}", message);
@@ -236,45 +342,28 @@ public class SubscriptionHelper extends ServiceSupport {
 
                     if (!message.isSuccessful()) {
                         String error = (String) message.get(ERROR_FIELD);
-                        if (error != null) {
-                            subscribeError[0] = error;
+                        if (error == null) {
+                            error = "Missing error message";
                         }
+                        Exception failure = getFailure(message);
+                        String msg = String.format("Error subscribing to %s: %s", topicName,
+                            failure != null ? failure.getMessage() : error);
+                        consumer.handleException(msg, new SalesforceException(msg, failure));
                     } else {
                         // remember subscription
                         LOG.info("Subscribed to channel {}", subscribedChannelName);
+                        listenerMap.put(consumer, listener);
                     }
-                    latch.countDown();
+
+                    // remove this subscription listener
+                    client.getChannel(META_SUBSCRIBE).removeListener(this);
                 }
             }
         };
         client.getChannel(META_SUBSCRIBE).addListener(subscriptionListener);
 
-        try {
-            clientChannel.subscribe(listener);
-
-            // confirm that a subscription was created
-            try {
-                if (!latch.await(CHANNEL_TIMEOUT, SECONDS)) {
-                    String message;
-                    if (subscribeError[0] != null) {
-                        message = String.format("Error subscribing to topic %s: %s",
-                                topicName, subscribeError[0]);
-                    } else {
-                        message = String.format("Timeout error subscribing to topic %s after %s seconds",
-                                topicName, CHANNEL_TIMEOUT);
-                    }
-                    throw new CamelException(message);
-                }
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                // probably shutting down, so forget subscription
-            }
-
-            listenerMap.put(consumer, listener);
-
-        } finally {
-            client.getChannel(META_SUBSCRIBE).removeListener(subscriptionListener);
-        }
+        // subscribe asynchronously
+        clientChannel.subscribe(listener);
     }
 
     private String getChannelName(String topicName) {
@@ -289,22 +378,25 @@ public class SubscriptionHelper extends ServiceSupport {
         // listen for unsubscribe error
         final CountDownLatch latch = new CountDownLatch(1);
         final String[] unsubscribeError = {null};
+        final Exception[] unsubscribeFailure = {null};
+
         final ClientSessionChannel.MessageListener unsubscribeListener = new ClientSessionChannel.MessageListener() {
             public void onMessage(ClientSessionChannel channel, Message message) {
                 LOG.debug("[CHANNEL:META_UNSUBSCRIBE]: {}", message);
-                String unsubscribedChannelName = message.get(SUBSCRIPTION_FIELD).toString();
-                if (channelName.equals(unsubscribedChannelName)) {
-
-                    if (!message.isSuccessful()) {
-                        String error = (String) message.get(ERROR_FIELD);
-                        if (error != null) {
-                            unsubscribeError[0] = error;
+                Object subscription = message.get(SUBSCRIPTION_FIELD);
+                if (subscription != null) {
+                    String unsubscribedChannelName = subscription.toString();
+                    if (channelName.equals(unsubscribedChannelName)) {
+
+                        if (!message.isSuccessful()) {
+                            unsubscribeError[0] = (String) message.get(ERROR_FIELD);
+                            unsubscribeFailure[0] = getFailure(message);
+                        } else {
+                            // forget subscription
+                            LOG.info("Unsubscribed from channel {}", unsubscribedChannelName);
                         }
-                    } else {
-                        // forget subscription
-                        LOG.info("Unsubscribed from channel {}", unsubscribedChannelName);
+                        latch.countDown();
                     }
-                    latch.countDown();
                 }
             }
         };
@@ -323,14 +415,17 @@ public class SubscriptionHelper extends ServiceSupport {
                 try {
                     if (!latch.await(CHANNEL_TIMEOUT, SECONDS)) {
                         String message;
-                        if (unsubscribeError[0] != null) {
+                        if (unsubscribeFailure[0] != null) {
+                            message = String.format("Error unsubscribing from topic %s: %s",
+                                topicName, unsubscribeFailure[0].getMessage());
+                        } else if (unsubscribeError[0] != null) {
                             message = String.format("Error unsubscribing from topic %s: %s",
                                     topicName, unsubscribeError[0]);
                         } else {
                             message = String.format("Timeout error unsubscribing from topic %s after %s seconds",
                                     topicName, CHANNEL_TIMEOUT);
                         }
-                        throw new CamelException(message);
+                        throw new CamelException(message, unsubscribeFailure[0]);
                     }
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();