You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by dh...@apache.org on 2016/08/30 05:33:45 UTC
[1/2] camel git commit: CAMEL-10238: Refactored to handle handshake
failure, added increment and max backoff properties
Repository: camel
Updated Branches:
refs/heads/master c07557607 -> 0b15168cd
CAMEL-10238: Refactored to handle handshake failure, added increment and max backoff properties
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7d97e5b5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7d97e5b5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7d97e5b5
Branch: refs/heads/master
Commit: 7d97e5b5800969b74e805e66a94a6c5323bfeafc
Parents: c075576
Author: Dhiraj Bokde <dh...@yahoo.com>
Authored: Mon Aug 29 22:32:56 2016 -0700
Committer: Dhiraj Bokde <dh...@yahoo.com>
Committed: Mon Aug 29 22:32:56 2016 -0700
----------------------------------------------------------------------
.../salesforce/SalesforceComponent.java | 24 +++
.../salesforce/SalesforceEndpointConfig.java | 36 +++-
.../internal/streaming/SubscriptionHelper.java | 165 +++++++++++--------
.../src/test/resources/log4j2.properties | 5 +
4 files changed, 158 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/7d97e5b5/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
index 76f7012..7b7b81a 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
@@ -1060,4 +1060,28 @@ public class SalesforceComponent extends UriEndpointComponent implements Endpoin
public void setInitialReplayIdMap(Map<String, Integer> initialReplayIdMap) {
getConfigOrCreate().setInitialReplayIdMap(initialReplayIdMap);
}
+
+ public long getBackoffIncrement() {
+ return getConfigOrCreate().getBackoffIncrement();
+ }
+
+ /**
+ * Backoff interval increment for Streaming connection restart attempts for failures beyond CometD auto-reconnect.
+ * @param backoffIncrement
+ */
+ public void setBackoffIncrement(long backoffIncrement) {
+ getConfigOrCreate().setBackoffIncrement(backoffIncrement);
+ }
+
+ public long getMaxBackoff() {
+ return getConfigOrCreate().getMaxBackoff();
+ }
+
+ /**
+ * Maximum backoff interval for Streaming connection restart attempts for failures beyond CometD auto-reconnect.
+ * @param maxBackoff
+ */
+ public void setMaxBackoff(long maxBackoff) {
+ getConfigOrCreate().setMaxBackoff(maxBackoff);
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/7d97e5b5/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
index 39d0d06..ad18aab 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
@@ -79,6 +79,10 @@ public class SalesforceEndpointConfig implements Cloneable {
// default maximum authentication retries on failed authentication or expired session
public static final int DEFAULT_MAX_AUTHENTICATION_RETRIES = 4;
+ // default increment and limit for Streaming connection restart attempts
+ public static final long DEFAULT_BACKOFF_INCREMENT = 1000L;
+ public static final long DEFAULT_MAX_BACKOFF = 30000L;
+
// general properties
@UriParam
private String apiVersion = DEFAULT_VERSION;
@@ -161,6 +165,14 @@ public class SalesforceEndpointConfig implements Cloneable {
@UriParam
private ObjectMapper objectMapper;
+ // Streaming connection restart attempt backoff interval increment
+ @UriParam
+ private long backoffIncrement = DEFAULT_BACKOFF_INCREMENT;
+
+ // Streaming connection restart attempt maximum backoff interval
+ @UriParam
+ private long maxBackoff = DEFAULT_MAX_BACKOFF;
+
public SalesforceEndpointConfig copy() {
try {
final SalesforceEndpointConfig copy = (SalesforceEndpointConfig) super.clone();
@@ -505,6 +517,28 @@ public class SalesforceEndpointConfig implements Cloneable {
return objectMapper;
}
+ public long getBackoffIncrement() {
+ return backoffIncrement;
+ }
+
+ /**
+ * Backoff interval increment for Streaming connection restart attempts for failures beyond CometD auto-reconnect.
+ */
+ public void setBackoffIncrement(long backoffIncrement) {
+ this.backoffIncrement = backoffIncrement;
+ }
+
+ public long getMaxBackoff() {
+ return maxBackoff;
+ }
+
+ /**
+ * Maximum backoff interval for Streaming connection restart attempts for failures beyond CometD auto-reconnect.
+ */
+ public void setMaxBackoff(long maxBackoff) {
+ this.maxBackoff = maxBackoff;
+ }
+
/**
* Custom Jackson ObjectMapper to use when serializing/deserializing Salesforce objects.
*/
@@ -538,7 +572,7 @@ public class SalesforceEndpointConfig implements Cloneable {
valueMap.put(JOB_ID, jobId);
valueMap.put(BATCH_ID, batchId);
valueMap.put(RESULT_ID, resultId);
-
+
// add analytics API properties
valueMap.put(REPORT_ID, reportId);
valueMap.put(INCLUDE_DETAILS, includeDetails);
http://git-wip-us.apache.org/repos/asf/camel/blob/7d97e5b5/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index c9a98ee..ed4c152 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -70,6 +71,8 @@ public class SubscriptionHelper extends ServiceSupport {
private final long timeout = 60 * 1000L;
private final Map<SalesforceConsumer, ClientSessionChannel.MessageListener> listenerMap;
+ private final long maxBackoff;
+ private final long backoffIncrement;
private ClientSessionChannel.MessageListener handshakeListener;
private ClientSessionChannel.MessageListener connectListener;
@@ -81,6 +84,7 @@ public class SubscriptionHelper extends ServiceSupport {
private volatile Exception connectException;
private volatile boolean reconnecting;
+ private final AtomicLong restartBackoff;
public SubscriptionHelper(SalesforceComponent component, String topicName) throws Exception {
this.component = component;
@@ -90,6 +94,10 @@ public class SubscriptionHelper extends ServiceSupport {
// create CometD client
this.client = createClient(topicName);
+
+ restartBackoff = new AtomicLong(0);
+ backoffIncrement = component.getConfig().getBackoffIncrement();
+ maxBackoff = component.getConfig().getMaxBackoff();
}
@Override
@@ -112,6 +120,10 @@ public class SubscriptionHelper extends ServiceSupport {
LOG.warn("Handshake failure: {}", message);
handshakeError = (String) message.get(ERROR_FIELD);
handshakeException = getFailure(message);
+
+ // restart if handshake fails for any reason
+ restartClient();
+
} else if (!listenerMap.isEmpty()) {
reconnecting = true;
}
@@ -133,12 +145,12 @@ public class SubscriptionHelper extends ServiceSupport {
connectException = getFailure(message);
if (connectError != null) {
- // refresh oauth token, if it's a 401 error
- if (connectError.startsWith("401::")) {
+ // refresh oauth token, if it's a 403 error
+ if (connectError.startsWith("403::")) {
try {
session.login(null);
} catch (SalesforceException e) {
- LOG.error("Error renewing OAuth token on Connect 401: {} ", e.getMessage(), e);
+ LOG.error("Error renewing OAuth token on Connect 403: " + e.getMessage(), e);
}
}
}
@@ -170,74 +182,7 @@ public class SubscriptionHelper extends ServiceSupport {
disconnectListener = new ClientSessionChannel.MessageListener() {
@Override
public void onMessage(ClientSessionChannel clientSessionChannel, Message message) {
-
- // launch an async task to reconnect
- final SalesforceHttpClient httpClient = component.getConfig().getHttpClient();
-
- httpClient.getExecutor().execute(new Runnable() {
- @Override
- public void run() {
-
- boolean abort = false;
- // wait for disconnect
- while (!client.isDisconnected()) {
- try {
- Thread.sleep(DISCONNECT_INTERVAL);
- } catch (InterruptedException e) {
- LOG.error("Aborting reconnect on interrupt!");
- abort = true;
- }
- }
-
- if (!abort) {
-
- LOG.info("Reconnecting on unexpected disconnect from Salesforce...");
- final long backoffIncrement = client.getBackoffIncrement();
- final long maxBackoff = client.getMaxBackoff();
-
- long backoff = backoffIncrement;
- String msg = String.format("Failed to reconnect, exceeded maximum backoff %s msecs", maxBackoff);
- Exception lastError = new SalesforceException(msg, null);
-
- // retry until interrupted, or handshook or connect backoff exceeded
- while (!abort && !client.isHandshook() && backoff < maxBackoff) {
-
- try {
- // reset client
- doStop();
-
- // register listeners and restart
- doStart();
-
- } catch (Exception e) {
- LOG.error("Error reconnecting to Salesforce: {}", e.getMessage(), e);
- lastError = e;
- }
-
- if (!client.isHandshook()) {
- LOG.debug("Pausing for {} msecs after reconnect failure", backoff);
- try {
- Thread.sleep(backoff);
- } catch (InterruptedException e) {
- LOG.error("Aborting reconnect on interrupt!");
- abort = true;
- }
- backoff += backoffIncrement;
- }
- }
-
- if (client.isHandshook()) {
- LOG.info("Successfully reconnected to Salesforce!");
- } else if (!abort) {
- // notify all consumers
- String abortMsg = "Aborting Salesforce reconnect due to: " + lastError.getMessage();
- for (SalesforceConsumer consumer : listenerMap.keySet()) {
- consumer.handleException(abortMsg, new SalesforceException(abortMsg, lastError));
- }
- }
- }
- }
- });
+ restartClient();
}
};
}
@@ -267,6 +212,84 @@ public class SubscriptionHelper extends ServiceSupport {
}
}
+ // launch an async task to restart
+ private void restartClient() {
+
+ // launch a new restart command
+ final SalesforceHttpClient httpClient = component.getConfig().getHttpClient();
+ httpClient.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+
+ LOG.info("Restarting on unexpected disconnect from Salesforce...");
+ boolean abort = false;
+
+ // wait for disconnect
+ LOG.debug("Waiting to disconnect...");
+ while (!client.isDisconnected()) {
+ try {
+ Thread.sleep(DISCONNECT_INTERVAL);
+ } catch (InterruptedException e) {
+ LOG.error("Aborting restart on interrupt!");
+ abort = true;
+ }
+ }
+
+ if (!abort) {
+
+ // update restart attempt backoff
+ final long backoff = restartBackoff.getAndAdd(backoffIncrement);
+ if (backoff > maxBackoff) {
+ LOG.error("Restart aborted after exceeding {} msecs backoff", maxBackoff);
+ abort = true;
+ } else {
+
+ // pause before restart attempt
+ LOG.debug("Pausing for {} msecs before restart attempt", backoff);
+ try {
+ Thread.sleep(backoff);
+ } catch (InterruptedException e) {
+ LOG.error("Aborting restart on interrupt!");
+ abort = true;
+ }
+ }
+
+ if (!abort) {
+ Exception lastError = new SalesforceException("Unknown error", null);
+ try {
+ // reset client
+ doStop();
+
+ // register listeners and restart
+ doStart();
+
+ } catch (Exception e) {
+ LOG.error("Error restarting: " + e.getMessage(), e);
+ lastError = e;
+ }
+
+ if (client.isHandshook()) {
+ LOG.info("Successfully restarted!");
+ // reset backoff interval
+ restartBackoff.set(client.getBackoffIncrement());
+ } else {
+ LOG.error("Failed to restart after pausing for {} msecs", backoff);
+ if ((backoff + backoffIncrement) > maxBackoff) {
+ // notify all consumers
+ String abortMsg = "Aborting restart attempt due to: " + lastError.getMessage();
+ SalesforceException ex = new SalesforceException(abortMsg, lastError);
+ for (SalesforceConsumer consumer : listenerMap.keySet()) {
+ consumer.handleException(abortMsg, ex);
+ }
+ }
+ }
+ }
+ }
+
+ }
+ });
+ }
+
@SuppressWarnings("unchecked")
private Exception getFailure(Message message) {
Exception exception = null;
http://git-wip-us.apache.org/repos/asf/camel/blob/7d97e5b5/components/camel-salesforce/camel-salesforce-component/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/resources/log4j2.properties b/components/camel-salesforce/camel-salesforce-component/src/test/resources/log4j2.properties
index ba27a06..4b5b208 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/test/resources/log4j2.properties
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/resources/log4j2.properties
@@ -26,3 +26,8 @@ appender.out.layout.type = PatternLayout
appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
rootLogger.level = INFO
rootLogger.appenderRef.file.ref = file
+
+logger.salesforce.name = org.apache.camel.component.salesforce
+logger.salesforce.level = TRACE
+#logger.httpclient.name = org.eclipse.jetty
+#logger.httpclient.level = DEBUG
\ No newline at end of file
[2/2] camel git commit: CAMEL-10238: Updated component docs
Posted by dh...@apache.org.
CAMEL-10238: Updated component docs
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0b15168c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0b15168c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0b15168c
Branch: refs/heads/master
Commit: 0b15168cd4747f7f1de8fb0f8e2230a1fe8ca4ea
Parents: 7d97e5b
Author: Dhiraj Bokde <dh...@yahoo.com>
Authored: Mon Aug 29 22:33:26 2016 -0700
Committer: Dhiraj Bokde <dh...@yahoo.com>
Committed: Mon Aug 29 22:33:26 2016 -0700
----------------------------------------------------------------------
.../SalesforceComponentConfiguration.java | 26 ++++++++++++++++++++
.../src/main/docs/salesforce-component.adoc | 8 ++++--
2 files changed, 32 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/0b15168c/components-starter/camel-salesforce-starter/src/main/java/org/apache/camel/component/salesforce/springboot/SalesforceComponentConfiguration.java
----------------------------------------------------------------------
diff --git a/components-starter/camel-salesforce-starter/src/main/java/org/apache/camel/component/salesforce/springboot/SalesforceComponentConfiguration.java b/components-starter/camel-salesforce-starter/src/main/java/org/apache/camel/component/salesforce/springboot/SalesforceComponentConfiguration.java
index a971286..d27021a 100644
--- a/components-starter/camel-salesforce-starter/src/main/java/org/apache/camel/component/salesforce/springboot/SalesforceComponentConfiguration.java
+++ b/components-starter/camel-salesforce-starter/src/main/java/org/apache/camel/component/salesforce/springboot/SalesforceComponentConfiguration.java
@@ -269,6 +269,16 @@ public class SalesforceComponentConfiguration {
* Replay IDs to start from per channel name.
*/
private Map<String, Integer> initialReplayIdMap;
+ /**
+ * Backoff interval increment for Streaming connection restart attempts for
+ * failures beyond CometD auto-reconnect.
+ */
+ private long backoffIncrement;
+ /**
+ * Maximum backoff interval for Streaming connection restart attempts for
+ * failures beyond CometD auto-reconnect.
+ */
+ private long maxBackoff;
public SalesforceLoginConfig getLoginConfig() {
return loginConfig;
@@ -713,4 +723,20 @@ public class SalesforceComponentConfiguration {
public void setInitialReplayIdMap(Map<String, Integer> initialReplayIdMap) {
this.initialReplayIdMap = initialReplayIdMap;
}
+
+ public long getBackoffIncrement() {
+ return backoffIncrement;
+ }
+
+ public void setBackoffIncrement(long backoffIncrement) {
+ this.backoffIncrement = backoffIncrement;
+ }
+
+ public long getMaxBackoff() {
+ return maxBackoff;
+ }
+
+ public void setMaxBackoff(long maxBackoff) {
+ this.maxBackoff = maxBackoff;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/0b15168c/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc b/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc
index 8f1a73d..1e2e892 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc
@@ -205,7 +205,7 @@ Options
// component options: START
-The Salesforce component supports 55 options which are listed below.
+The Salesforce component supports 57 options which are listed below.
@@ -268,6 +268,8 @@ The Salesforce component supports 55 options which are listed below.
| objectMapper | ObjectMapper | Custom Jackson ObjectMapper to use when serializing/deserializing Salesforce objects.
| defaultReplayId | Integer | Default replayId setting if no value is found in link initialReplayIdMap
| initialReplayIdMap | Map | Replay IDs to start from per channel name.
+| backoffIncrement | long | Backoff interval increment for Streaming connection restart attempts for failures beyond CometD auto-reconnect.
+| maxBackoff | long | Maximum backoff interval for Streaming connection restart attempts for failures beyond CometD auto-reconnect.
|=======================================================================
{% endraw %}
// component options: END
@@ -282,7 +284,7 @@ The Salesforce component supports 55 options which are listed below.
// endpoint options: START
-The Salesforce component supports 39 endpoint options which are listed below:
+The Salesforce component supports 41 endpoint options which are listed below:
{% raw %}
[width="100%",cols="2,1,1m,1m,5",options="header"]
@@ -294,6 +296,7 @@ The Salesforce component supports 39 endpoint options which are listed below:
| apexQueryParams | common | | Map | Query params for APEX method
| apexUrl | common | | String | APEX method URL
| apiVersion | common | | String | Salesforce API version defaults to SalesforceEndpointConfig.DEFAULT_VERSION
+| backoffIncrement | common | | long | Backoff interval increment for Streaming connection restart attempts for failures beyond CometD auto-reconnect.
| batchId | common | | String | Bulk API Batch ID
| contentType | common | | ContentType | Bulk API content type one of XML CSV ZIP_XML ZIP_CSV
| defaultReplayId | common | | Integer | Default replayId setting if no value is found in link initialReplayIdMap
@@ -303,6 +306,7 @@ The Salesforce component supports 39 endpoint options which are listed below:
| initialReplayIdMap | common | | Map | Replay IDs to start from per channel name.
| instanceId | common | | String | Salesforce1 Analytics report execution instance ID
| jobId | common | | String | Bulk API Job ID
+| maxBackoff | common | | long | Maximum backoff interval for Streaming connection restart attempts for failures beyond CometD auto-reconnect.
| notifyForFields | common | | NotifyForFieldsEnum | Notify for fields options are ALL REFERENCED SELECT WHERE
| notifyForOperationCreate | common | | Boolean | Notify for create operation defaults to false (API version = 29.0)
| notifyForOperationDelete | common | | Boolean | Notify for delete operation defaults to false (API version = 29.0)