You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by dh...@apache.org on 2016/08/25 15:56:58 UTC
[1/2] camel git commit: CAMEL-10238: Restore Cookies and HOST headers
in securityhandler for subscriptions
Repository: camel
Updated Branches:
refs/heads/camel-2.17.x ac501457d -> 36f857888
CAMEL-10238: Restore Cookies and HOST headers in securityhandler for subscriptions
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/59b67d2d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/59b67d2d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/59b67d2d
Branch: refs/heads/camel-2.17.x
Commit: 59b67d2d3b56da9fde7e283255b524bad57c3e99
Parents: ac50145
Author: Dhiraj Bokde <dh...@yahoo.com>
Authored: Thu Aug 25 00:28:58 2016 -0700
Committer: Dhiraj Bokde <dh...@yahoo.com>
Committed: Thu Aug 25 08:47:46 2016 -0700
----------------------------------------------------------------------
.../client/SalesforceSecurityHandler.java | 36 ++++++++++++++------
1 file changed, 25 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/59b67d2d/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityHandler.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityHandler.java
index 8df28de..11c8cbe 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityHandler.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/SalesforceSecurityHandler.java
@@ -30,6 +30,8 @@ import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BufferingResponseListener;
+import org.eclipse.jetty.http.HttpField;
+import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpStatus;
import org.slf4j.Logger;
@@ -208,24 +210,36 @@ public class SalesforceSecurityHandler implements ProtocolHandler {
if (copy) {
newRequest = httpClient.copyRequest(request, request.getURI());
newRequest.method(request.getMethod());
+ HttpFields headers = newRequest.getHeaders();
+ // copy cookies and host for subscriptions to avoid '403::Unknown Client' errors
+ for (HttpField field : request.getHeaders()) {
+ HttpHeader header = field.getHeader();
+ if (HttpHeader.COOKIE.equals(header) || HttpHeader.HOST.equals(header)) {
+ headers.add(header, field.getValue());
+ }
+ }
} else {
newRequest = request;
}
conversation.setAttribute(AUTHENTICATION_RETRIES_ATTRIBUTE, ++retries);
- LOG.debug("Retry attempt {} on authentication error for {}", retries, request);
+ Object originalRequest = conversation.getAttribute(AUTHENTICATION_REQUEST_ATTRIBUTE);
+ LOG.debug("Retry attempt {} on authentication error for {}", retries, originalRequest != null ? originalRequest : newRequest);
- // update currentToken
- String currentToken = session.getAccessToken();
- if (client != null) {
- // update client cache for this and future requests
- client.setAccessToken(currentToken);
- client.setInstanceUrl(session.getInstanceUrl());
- client.setAccessToken(newRequest);
- } else {
- // plain request not made by an AbstractClientBase
- newRequest.header(HttpHeader.AUTHORIZATION, "OAuth " + currentToken);
+ // update currentToken for original request
+ if (originalRequest == null) {
+
+ String currentToken = session.getAccessToken();
+ if (client != null) {
+ // update client cache for this and future requests
+ client.setAccessToken(currentToken);
+ client.setInstanceUrl(session.getInstanceUrl());
+ client.setAccessToken(newRequest);
+ } else {
+ // plain request not made by an AbstractClientBase
+ newRequest.header(HttpHeader.AUTHORIZATION, "OAuth " + currentToken);
+ }
}
// send new async request with a new delegate
[2/2] camel git commit: CAMEL-10238: Updated subscription helper to
listen for hard disconnects, and reconnect from scratch,
also updated error handling throwing SalesforceException from consumer
endpoints
Posted by dh...@apache.org.
CAMEL-10238: Updated subscription helper to listen for hard disconnects, and reconnect from scratch, also updated error handling throwing SalesforceException from consumer endpoints
Conflicts:
components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/36f85788
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/36f85788
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/36f85788
Branch: refs/heads/camel-2.17.x
Commit: 36f857888bbfc07bdcc2908d3bcbd5df01aaa03f
Parents: 59b67d2
Author: Dhiraj Bokde <dh...@yahoo.com>
Authored: Thu Aug 25 01:20:07 2016 -0700
Committer: Dhiraj Bokde <dh...@yahoo.com>
Committed: Thu Aug 25 08:56:42 2016 -0700
----------------------------------------------------------------------
.../salesforce/SalesforceConsumer.java | 9 +-
.../internal/streaming/SubscriptionHelper.java | 237 +++++++++++++------
2 files changed, 172 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/36f85788/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
index e6434a5..e5c3075 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
@@ -26,6 +26,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.component.salesforce.internal.PayloadFormat;
+import org.apache.camel.component.salesforce.api.SalesforceException;
import org.apache.camel.component.salesforce.internal.client.DefaultRestClient;
import org.apache.camel.component.salesforce.internal.client.RestClient;
import org.apache.camel.component.salesforce.internal.streaming.PushTopicHelper;
@@ -172,7 +173,7 @@ public class SalesforceConsumer extends DefaultConsumer {
} catch (IOException e) {
final String msg = String.format("Error parsing message [%s] from Topic %s: %s",
message, topicName, e.getMessage());
- handleException(msg, new RuntimeCamelException(msg, e));
+ handleException(msg, new SalesforceException(msg, e));
}
try {
@@ -186,11 +187,13 @@ public class SalesforceConsumer extends DefaultConsumer {
}
});
} catch (Exception e) {
- handleException(String.format("Error processing %s: %s", exchange, e.getMessage()), e);
+ String msg = String.format("Error processing %s: %s", exchange, e);
+ handleException(msg, new SalesforceException(msg, e));
} finally {
Exception ex = exchange.getException();
if (ex != null) {
- handleException(String.format("Unhandled exception: %s", ex.getMessage()), ex);
+ String msg = String.format("Unhandled exception: %s", ex.getMessage());
+ handleException(msg, new SalesforceException(msg, ex));
}
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/36f85788/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index 1cc4a21..1e3e8a4 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -28,6 +28,7 @@ import org.apache.camel.CamelException;
import org.apache.camel.component.salesforce.SalesforceComponent;
import org.apache.camel.component.salesforce.SalesforceConsumer;
import org.apache.camel.component.salesforce.SalesforceHttpClient;
+import org.apache.camel.component.salesforce.api.SalesforceException;
import org.apache.camel.component.salesforce.internal.SalesforceSession;
import org.apache.camel.support.ServiceSupport;
import org.cometd.bayeux.Message;
@@ -41,6 +42,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.cometd.bayeux.Channel.META_CONNECT;
+import static org.cometd.bayeux.Channel.META_DISCONNECT;
import static org.cometd.bayeux.Channel.META_HANDSHAKE;
import static org.cometd.bayeux.Channel.META_SUBSCRIBE;
import static org.cometd.bayeux.Channel.META_UNSUBSCRIBE;
@@ -54,7 +56,9 @@ public class SubscriptionHelper extends ServiceSupport {
private static final int CONNECT_TIMEOUT = 110;
private static final int CHANNEL_TIMEOUT = 40;
+ private static final String FAILURE_FIELD = "failure";
private static final String EXCEPTION_FIELD = "exception";
+ private static final int DISCONNECT_INTERVAL = 5000;
private final SalesforceComponent component;
private final SalesforceSession session;
@@ -65,11 +69,14 @@ public class SubscriptionHelper extends ServiceSupport {
private ClientSessionChannel.MessageListener handshakeListener;
private ClientSessionChannel.MessageListener connectListener;
+ private ClientSessionChannel.MessageListener disconnectListener;
- private String handshakeError;
- private Exception handshakeException;
- private String connectError;
- private boolean reconnecting;
+ private volatile String handshakeError;
+ private volatile Exception handshakeException;
+ private volatile String connectError;
+ private volatile Exception connectException;
+
+ private volatile boolean reconnecting;
public SubscriptionHelper(SalesforceComponent component) throws Exception {
this.component = component;
@@ -83,6 +90,13 @@ public class SubscriptionHelper extends ServiceSupport {
@Override
protected void doStart() throws Exception {
+
+ // reset all error conditions
+ handshakeError = null;
+ handshakeException = null;
+ connectError = null;
+ connectException = null;
+
// listener for handshake error or exception
if (handshakeListener == null) {
// first start
@@ -91,14 +105,9 @@ public class SubscriptionHelper extends ServiceSupport {
LOG.debug("[CHANNEL:META_HANDSHAKE]: {}", message);
if (!message.isSuccessful()) {
- String error = (String) message.get(ERROR_FIELD);
- if (error != null) {
- handshakeError = error;
- }
- Exception exception = (Exception) message.get(EXCEPTION_FIELD);
- if (exception != null) {
- handshakeException = exception;
- }
+ LOG.warn("Handshake failure: {}", message);
+ handshakeError = (String) message.get(ERROR_FIELD);
+ handshakeException = getFailure(message);
} else if (!listenerMap.isEmpty()) {
reconnecting = true;
}
@@ -114,10 +123,22 @@ public class SubscriptionHelper extends ServiceSupport {
LOG.debug("[CHANNEL:META_CONNECT]: {}", message);
if (!message.isSuccessful()) {
- String error = (String) message.get(ERROR_FIELD);
- if (error != null) {
- connectError = error;
+
+ LOG.warn("Connect failure: {}", message);
+ connectError = (String) message.get(ERROR_FIELD);
+ connectException = getFailure(message);
+
+ if (connectError != null) {
+ // refresh oauth token, if it's a 401 error
+ if (connectError.startsWith("401::")) {
+ try {
+ session.login(null);
+ } catch (SalesforceException e) {
+ LOG.error("Error renewing OAuth token on Connect 401: {} ", e.getMessage(), e);
+ }
+ }
}
+
} else if (reconnecting) {
reconnecting = false;
@@ -131,15 +152,7 @@ public class SubscriptionHelper extends ServiceSupport {
for (Map.Entry<SalesforceConsumer, ClientSessionChannel.MessageListener> entry : map.entrySet()) {
final SalesforceConsumer consumer = entry.getKey();
final String topicName = consumer.getTopicName();
- try {
- subscribe(topicName, consumer);
- } catch (CamelException e) {
- // let the consumer handle the exception
- consumer.handleException(
- String.format("Error refreshing subscription to topic [%s]: %s",
- topicName, e.getMessage()),
- e);
- }
+ subscribe(topicName, consumer);
}
}
@@ -148,6 +161,84 @@ public class SubscriptionHelper extends ServiceSupport {
}
client.getChannel(META_CONNECT).addListener(connectListener);
+ // handle fatal disconnects by reconnecting asynchronously
+ if (disconnectListener == null) {
+ disconnectListener = new ClientSessionChannel.MessageListener() {
+ @Override
+ public void onMessage(ClientSessionChannel clientSessionChannel, Message message) {
+
+ // launch an async task to reconnect
+ final SalesforceHttpClient httpClient = component.getConfig().getHttpClient();
+
+ httpClient.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+
+ boolean abort = false;
+ // wait for disconnect
+ while (!client.isDisconnected()) {
+ try {
+ Thread.sleep(DISCONNECT_INTERVAL);
+ } catch (InterruptedException e) {
+ LOG.error("Aborting reconnect on interrupt!");
+ abort = true;
+ }
+ }
+
+ if (!abort) {
+
+ LOG.info("Reconnecting on unexpected disconnect from Salesforce...");
+ final long backoffIncrement = client.getBackoffIncrement();
+ final long maxBackoff = client.getMaxBackoff();
+
+ long backoff = backoffIncrement;
+ String msg = String.format("Failed to reconnect, exceeded maximum backoff %s msecs", maxBackoff);
+ Exception lastError = new SalesforceException(msg, null);
+
+ // retry until interrupted, or handshook or connect backoff exceeded
+ while (!abort && !client.isHandshook() && backoff < maxBackoff) {
+
+ try {
+ // reset client
+ doStop();
+
+ // register listeners and restart
+ doStart();
+
+ } catch (Exception e) {
+ LOG.error("Error reconnecting to Salesforce: {}", e.getMessage(), e);
+ lastError = e;
+ }
+
+ if (!client.isHandshook()) {
+ LOG.debug("Pausing for {} msecs after reconnect failure", backoff);
+ try {
+ Thread.sleep(backoff);
+ } catch (InterruptedException e) {
+ LOG.error("Aborting reconnect on interrupt!");
+ abort = true;
+ }
+ backoff += backoffIncrement;
+ }
+ }
+
+ if (client.isHandshook()) {
+ LOG.info("Successfully reconnected to Salesforce!");
+ } else if (!abort) {
+ // notify all consumers
+ String abortMsg = "Aborting Salesforce reconnect due to: " + lastError.getMessage();
+ for (SalesforceConsumer consumer : listenerMap.keySet()) {
+ consumer.handleException(abortMsg, new SalesforceException(abortMsg, lastError));
+ }
+ }
+ }
+ }
+ });
+ }
+ };
+ }
+ client.getChannel(META_DISCONNECT).addListener(disconnectListener);
+
// connect to Salesforce cometd endpoint
client.handshake();
@@ -159,6 +250,10 @@ public class SubscriptionHelper extends ServiceSupport {
handshakeException);
} else if (handshakeError != null) {
throw new CamelException(String.format("Error during HANDSHAKE: %s", handshakeError));
+ } else if (connectException != null) {
+ throw new CamelException(
+ String.format("Exception during CONNECT: %s", connectException.getMessage()),
+ connectException);
} else if (connectError != null) {
throw new CamelException(String.format("Error during CONNECT: %s", connectError));
} else {
@@ -168,8 +263,20 @@ public class SubscriptionHelper extends ServiceSupport {
}
}
+ @SuppressWarnings("unchecked")
+ private Exception getFailure(Message message) {
+ Exception exception = null;
+ if (message.get(EXCEPTION_FIELD) != null) {
+ exception = (Exception) message.get(EXCEPTION_FIELD);
+ } else if (message.get(FAILURE_FIELD) != null) {
+ exception = (Exception) ((Map<String, Object>)message.get("failure")).get("exception");
+ }
+ return exception;
+ }
+
@Override
protected void doStop() throws Exception {
+ client.getChannel(META_DISCONNECT).removeListener(disconnectListener);
client.getChannel(META_CONNECT).removeListener(connectListener);
client.getChannel(META_HANDSHAKE).removeListener(handshakeListener);
@@ -198,7 +305,8 @@ public class SubscriptionHelper extends ServiceSupport {
super.customize(request);
// add current security token obtained from session
- request.header(HttpHeader.AUTHORIZATION, "OAuth " + session.getAccessToken());
+ // replace old token
+ request.getHeaders().put(HttpHeader.AUTHORIZATION, "OAuth " + session.getAccessToken());
}
};
@@ -206,7 +314,7 @@ public class SubscriptionHelper extends ServiceSupport {
return client;
}
- public void subscribe(final String topicName, final SalesforceConsumer consumer) throws CamelException {
+ public void subscribe(final String topicName, final SalesforceConsumer consumer) {
// create subscription for consumer
final String channelName = getChannelName(topicName);
@@ -225,9 +333,7 @@ public class SubscriptionHelper extends ServiceSupport {
final ClientSessionChannel clientChannel = client.getChannel(channelName);
- // listener for subscribe error
- final CountDownLatch latch = new CountDownLatch(1);
- final String[] subscribeError = {null};
+ // listener for subscription
final ClientSessionChannel.MessageListener subscriptionListener = new ClientSessionChannel.MessageListener() {
public void onMessage(ClientSessionChannel channel, Message message) {
LOG.debug("[CHANNEL:META_SUBSCRIBE]: {}", message);
@@ -236,45 +342,28 @@ public class SubscriptionHelper extends ServiceSupport {
if (!message.isSuccessful()) {
String error = (String) message.get(ERROR_FIELD);
- if (error != null) {
- subscribeError[0] = error;
+ if (error == null) {
+ error = "Missing error message";
}
+ Exception failure = getFailure(message);
+ String msg = String.format("Error subscribing to %s: %s", topicName,
+ failure != null ? failure.getMessage() : error);
+ consumer.handleException(msg, new SalesforceException(msg, failure));
} else {
// remember subscription
LOG.info("Subscribed to channel {}", subscribedChannelName);
+ listenerMap.put(consumer, listener);
}
- latch.countDown();
+
+ // remove this subscription listener
+ client.getChannel(META_SUBSCRIBE).removeListener(this);
}
}
};
client.getChannel(META_SUBSCRIBE).addListener(subscriptionListener);
- try {
- clientChannel.subscribe(listener);
-
- // confirm that a subscription was created
- try {
- if (!latch.await(CHANNEL_TIMEOUT, SECONDS)) {
- String message;
- if (subscribeError[0] != null) {
- message = String.format("Error subscribing to topic %s: %s",
- topicName, subscribeError[0]);
- } else {
- message = String.format("Timeout error subscribing to topic %s after %s seconds",
- topicName, CHANNEL_TIMEOUT);
- }
- throw new CamelException(message);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- // probably shutting down, so forget subscription
- }
-
- listenerMap.put(consumer, listener);
-
- } finally {
- client.getChannel(META_SUBSCRIBE).removeListener(subscriptionListener);
- }
+ // subscribe asynchronously
+ clientChannel.subscribe(listener);
}
private String getChannelName(String topicName) {
@@ -289,22 +378,25 @@ public class SubscriptionHelper extends ServiceSupport {
// listen for unsubscribe error
final CountDownLatch latch = new CountDownLatch(1);
final String[] unsubscribeError = {null};
+ final Exception[] unsubscribeFailure = {null};
+
final ClientSessionChannel.MessageListener unsubscribeListener = new ClientSessionChannel.MessageListener() {
public void onMessage(ClientSessionChannel channel, Message message) {
LOG.debug("[CHANNEL:META_UNSUBSCRIBE]: {}", message);
- String unsubscribedChannelName = message.get(SUBSCRIPTION_FIELD).toString();
- if (channelName.equals(unsubscribedChannelName)) {
-
- if (!message.isSuccessful()) {
- String error = (String) message.get(ERROR_FIELD);
- if (error != null) {
- unsubscribeError[0] = error;
+ Object subscription = message.get(SUBSCRIPTION_FIELD);
+ if (subscription != null) {
+ String unsubscribedChannelName = subscription.toString();
+ if (channelName.equals(unsubscribedChannelName)) {
+
+ if (!message.isSuccessful()) {
+ unsubscribeError[0] = (String) message.get(ERROR_FIELD);
+ unsubscribeFailure[0] = getFailure(message);
+ } else {
+ // forget subscription
+ LOG.info("Unsubscribed from channel {}", unsubscribedChannelName);
}
- } else {
- // forget subscription
- LOG.info("Unsubscribed from channel {}", unsubscribedChannelName);
+ latch.countDown();
}
- latch.countDown();
}
}
};
@@ -323,14 +415,17 @@ public class SubscriptionHelper extends ServiceSupport {
try {
if (!latch.await(CHANNEL_TIMEOUT, SECONDS)) {
String message;
- if (unsubscribeError[0] != null) {
+ if (unsubscribeFailure[0] != null) {
+ message = String.format("Error unsubscribing from topic %s: %s",
+ topicName, unsubscribeFailure[0].getMessage());
+ } else if (unsubscribeError[0] != null) {
message = String.format("Error unsubscribing from topic %s: %s",
topicName, unsubscribeError[0]);
} else {
message = String.format("Timeout error unsubscribing from topic %s after %s seconds",
topicName, CHANNEL_TIMEOUT);
}
- throw new CamelException(message);
+ throw new CamelException(message, unsubscribeFailure[0]);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();