You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by dh...@apache.org on 2016/08/30 05:33:45 UTC

[1/2] camel git commit: CAMEL-10238: Refactored to handle handshake failure, added increment and max backoff properties

Repository: camel
Updated Branches:
  refs/heads/master c07557607 -> 0b15168cd


CAMEL-10238: Refactored to handle handshake failure, added increment and max backoff properties


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7d97e5b5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7d97e5b5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7d97e5b5

Branch: refs/heads/master
Commit: 7d97e5b5800969b74e805e66a94a6c5323bfeafc
Parents: c075576
Author: Dhiraj Bokde <dh...@yahoo.com>
Authored: Mon Aug 29 22:32:56 2016 -0700
Committer: Dhiraj Bokde <dh...@yahoo.com>
Committed: Mon Aug 29 22:32:56 2016 -0700

----------------------------------------------------------------------
 .../salesforce/SalesforceComponent.java         |  24 +++
 .../salesforce/SalesforceEndpointConfig.java    |  36 +++-
 .../internal/streaming/SubscriptionHelper.java  | 165 +++++++++++--------
 .../src/test/resources/log4j2.properties        |   5 +
 4 files changed, 158 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/7d97e5b5/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
index 76f7012..7b7b81a 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
@@ -1060,4 +1060,28 @@ public class SalesforceComponent extends UriEndpointComponent implements Endpoin
     public void setInitialReplayIdMap(Map<String, Integer> initialReplayIdMap) {
         getConfigOrCreate().setInitialReplayIdMap(initialReplayIdMap);
     }
+
+    public long getBackoffIncrement() {
+        return getConfigOrCreate().getBackoffIncrement();
+    }
+
+    /**
+     * Backoff interval increment for Streaming connection restart attempts for failures beyond CometD auto-reconnect.
+     * @param backoffIncrement
+     */
+    public void setBackoffIncrement(long backoffIncrement) {
+        getConfigOrCreate().setBackoffIncrement(backoffIncrement);
+    }
+
+    public long getMaxBackoff() {
+        return getConfigOrCreate().getMaxBackoff();
+    }
+
+    /**
+     * Maximum backoff interval for Streaming connection restart attempts for failures beyond CometD auto-reconnect.
+     * @param maxBackoff
+     */
+    public void setMaxBackoff(long maxBackoff) {
+        getConfigOrCreate().setMaxBackoff(maxBackoff);
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/7d97e5b5/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
index 39d0d06..ad18aab 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
@@ -79,6 +79,10 @@ public class SalesforceEndpointConfig implements Cloneable {
     // default maximum authentication retries on failed authentication or expired session
     public static final int DEFAULT_MAX_AUTHENTICATION_RETRIES = 4;
 
+    // default increment and limit for Streaming connection restart attempts
+    public static final long DEFAULT_BACKOFF_INCREMENT = 1000L;
+    public static final long DEFAULT_MAX_BACKOFF = 30000L;
+
     // general properties
     @UriParam
     private String apiVersion = DEFAULT_VERSION;
@@ -161,6 +165,14 @@ public class SalesforceEndpointConfig implements Cloneable {
     @UriParam
     private ObjectMapper objectMapper;
 
+    // Streaming connection restart attempt backoff interval increment
+    @UriParam
+    private long backoffIncrement = DEFAULT_BACKOFF_INCREMENT;
+
+    // Streaming connection restart attempt maximum backoff interval
+    @UriParam
+    private long maxBackoff = DEFAULT_MAX_BACKOFF;
+
     public SalesforceEndpointConfig copy() {
         try {
             final SalesforceEndpointConfig copy = (SalesforceEndpointConfig) super.clone();
@@ -505,6 +517,28 @@ public class SalesforceEndpointConfig implements Cloneable {
         return objectMapper;
     }
 
+    public long getBackoffIncrement() {
+        return backoffIncrement;
+    }
+
+    /**
+     * Backoff interval increment for Streaming connection restart attempts for failures beyond CometD auto-reconnect.
+     */
+    public void setBackoffIncrement(long backoffIncrement) {
+        this.backoffIncrement = backoffIncrement;
+    }
+
+    public long getMaxBackoff() {
+        return maxBackoff;
+    }
+
+    /**
+     * Maximum backoff interval for Streaming connection restart attempts for failures beyond CometD auto-reconnect.
+     */
+    public void setMaxBackoff(long maxBackoff) {
+        this.maxBackoff = maxBackoff;
+    }
+
     /**
      * Custom Jackson ObjectMapper to use when serializing/deserializing Salesforce objects.
      */
@@ -538,7 +572,7 @@ public class SalesforceEndpointConfig implements Cloneable {
         valueMap.put(JOB_ID, jobId);
         valueMap.put(BATCH_ID, batchId);
         valueMap.put(RESULT_ID, resultId);
-        
+
         // add analytics API properties
         valueMap.put(REPORT_ID, reportId);
         valueMap.put(INCLUDE_DETAILS, includeDetails);

http://git-wip-us.apache.org/repos/asf/camel/blob/7d97e5b5/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index c9a98ee..ed4c152 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -70,6 +71,8 @@ public class SubscriptionHelper extends ServiceSupport {
     private final long timeout = 60 * 1000L;
 
     private final Map<SalesforceConsumer, ClientSessionChannel.MessageListener> listenerMap;
+    private final long maxBackoff;
+    private final long backoffIncrement;
 
     private ClientSessionChannel.MessageListener handshakeListener;
     private ClientSessionChannel.MessageListener connectListener;
@@ -81,6 +84,7 @@ public class SubscriptionHelper extends ServiceSupport {
     private volatile Exception connectException;
 
     private volatile boolean reconnecting;
+    private final AtomicLong restartBackoff;
 
     public SubscriptionHelper(SalesforceComponent component, String topicName) throws Exception {
         this.component = component;
@@ -90,6 +94,10 @@ public class SubscriptionHelper extends ServiceSupport {
 
         // create CometD client
         this.client = createClient(topicName);
+
+        restartBackoff = new AtomicLong(0);
+        backoffIncrement = component.getConfig().getBackoffIncrement();
+        maxBackoff = component.getConfig().getMaxBackoff();
     }
 
     @Override
@@ -112,6 +120,10 @@ public class SubscriptionHelper extends ServiceSupport {
                         LOG.warn("Handshake failure: {}", message);
                         handshakeError = (String) message.get(ERROR_FIELD);
                         handshakeException = getFailure(message);
+
+                        // restart if handshake fails for any reason
+                        restartClient();
+
                     } else if (!listenerMap.isEmpty()) {
                         reconnecting = true;
                     }
@@ -133,12 +145,12 @@ public class SubscriptionHelper extends ServiceSupport {
                         connectException = getFailure(message);
 
                         if (connectError != null) {
-                            // refresh oauth token, if it's a 401 error
-                            if (connectError.startsWith("401::")) {
+                            // refresh oauth token, if it's a 403 error
+                            if (connectError.startsWith("403::")) {
                                 try {
                                     session.login(null);
                                 } catch (SalesforceException e) {
-                                    LOG.error("Error renewing OAuth token on Connect 401: {} ", e.getMessage(), e);
+                                    LOG.error("Error renewing OAuth token on Connect 403: " + e.getMessage(), e);
                                 }
                             }
                         }
@@ -170,74 +182,7 @@ public class SubscriptionHelper extends ServiceSupport {
             disconnectListener = new ClientSessionChannel.MessageListener() {
                 @Override
                 public void onMessage(ClientSessionChannel clientSessionChannel, Message message) {
-
-                    // launch an async task to reconnect
-                    final SalesforceHttpClient httpClient = component.getConfig().getHttpClient();
-
-                    httpClient.getExecutor().execute(new Runnable() {
-                        @Override
-                        public void run() {
-
-                            boolean abort = false;
-                            // wait for disconnect
-                            while (!client.isDisconnected()) {
-                                try {
-                                    Thread.sleep(DISCONNECT_INTERVAL);
-                                } catch (InterruptedException e) {
-                                    LOG.error("Aborting reconnect on interrupt!");
-                                    abort = true;
-                                }
-                            }
-
-                            if (!abort) {
-
-                                LOG.info("Reconnecting on unexpected disconnect from Salesforce...");
-                                final long backoffIncrement = client.getBackoffIncrement();
-                                final long maxBackoff = client.getMaxBackoff();
-
-                                long backoff = backoffIncrement;
-                                String msg = String.format("Failed to reconnect, exceeded maximum backoff %s msecs", maxBackoff);
-                                Exception lastError = new SalesforceException(msg, null);
-
-                                // retry until interrupted, or handshook or connect backoff exceeded
-                                while (!abort && !client.isHandshook() && backoff < maxBackoff) {
-
-                                    try {
-                                        // reset client
-                                        doStop();
-
-                                        // register listeners and restart
-                                        doStart();
-
-                                    } catch (Exception e) {
-                                        LOG.error("Error reconnecting to Salesforce: {}", e.getMessage(), e);
-                                        lastError = e;
-                                    }
-
-                                    if (!client.isHandshook()) {
-                                        LOG.debug("Pausing for {} msecs after reconnect failure", backoff);
-                                        try {
-                                            Thread.sleep(backoff);
-                                        } catch (InterruptedException e) {
-                                            LOG.error("Aborting reconnect on interrupt!");
-                                            abort = true;
-                                        }
-                                        backoff += backoffIncrement;
-                                    }
-                                }
-
-                                if (client.isHandshook()) {
-                                    LOG.info("Successfully reconnected to Salesforce!");
-                                } else if (!abort) {
-                                    // notify all consumers
-                                    String abortMsg = "Aborting Salesforce reconnect due to: " + lastError.getMessage();
-                                    for (SalesforceConsumer consumer : listenerMap.keySet()) {
-                                        consumer.handleException(abortMsg, new SalesforceException(abortMsg, lastError));
-                                    }
-                                }
-                            }
-                        }
-                    });
+                    restartClient();
                 }
             };
         }
@@ -267,6 +212,84 @@ public class SubscriptionHelper extends ServiceSupport {
         }
     }
 
+    // launch an async task to restart
+    private void restartClient() {
+
+        // launch a new restart command
+        final SalesforceHttpClient httpClient = component.getConfig().getHttpClient();
+        httpClient.getExecutor().execute(new Runnable() {
+            @Override
+            public void run() {
+
+                LOG.info("Restarting on unexpected disconnect from Salesforce...");
+                boolean abort = false;
+
+                // wait for disconnect
+                LOG.debug("Waiting to disconnect...");
+                while (!client.isDisconnected()) {
+                    try {
+                        Thread.sleep(DISCONNECT_INTERVAL);
+                    } catch (InterruptedException e) {
+                        LOG.error("Aborting restart on interrupt!");
+                        abort = true;
+                    }
+                }
+
+                if (!abort) {
+
+                    // update restart attempt backoff
+                    final long backoff = restartBackoff.getAndAdd(backoffIncrement);
+                    if (backoff > maxBackoff) {
+                        LOG.error("Restart aborted after exceeding {} msecs backoff", maxBackoff);
+                        abort = true;
+                    } else {
+
+                        // pause before restart attempt
+                        LOG.debug("Pausing for {} msecs before restart attempt", backoff);
+                        try {
+                            Thread.sleep(backoff);
+                        } catch (InterruptedException e) {
+                            LOG.error("Aborting restart on interrupt!");
+                            abort = true;
+                        }
+                    }
+
+                    if (!abort) {
+                        Exception lastError = new SalesforceException("Unknown error", null);
+                        try {
+                            // reset client
+                            doStop();
+
+                            // register listeners and restart
+                            doStart();
+
+                        } catch (Exception e) {
+                            LOG.error("Error restarting: " + e.getMessage(), e);
+                            lastError = e;
+                        }
+
+                        if (client.isHandshook()) {
+                            LOG.info("Successfully restarted!");
+                            // reset backoff interval
+                            restartBackoff.set(client.getBackoffIncrement());
+                        } else {
+                            LOG.error("Failed to restart after pausing for {} msecs", backoff);
+                            if ((backoff + backoffIncrement) > maxBackoff) {
+                                // notify all consumers
+                                String abortMsg = "Aborting restart attempt due to: " + lastError.getMessage();
+                                SalesforceException ex = new SalesforceException(abortMsg, lastError);
+                                for (SalesforceConsumer consumer : listenerMap.keySet()) {
+                                    consumer.handleException(abortMsg, ex);
+                                }
+                            }
+                        }
+                    }
+                }
+
+            }
+        });
+    }
+
     @SuppressWarnings("unchecked")
     private Exception getFailure(Message message) {
         Exception exception = null;

http://git-wip-us.apache.org/repos/asf/camel/blob/7d97e5b5/components/camel-salesforce/camel-salesforce-component/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/resources/log4j2.properties b/components/camel-salesforce/camel-salesforce-component/src/test/resources/log4j2.properties
index ba27a06..4b5b208 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/test/resources/log4j2.properties
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/resources/log4j2.properties
@@ -26,3 +26,8 @@ appender.out.layout.type = PatternLayout
 appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
 rootLogger.level = INFO
 rootLogger.appenderRef.file.ref = file
+
+logger.salesforce.name = org.apache.camel.component.salesforce
+logger.salesforce.level = TRACE
+#logger.httpclient.name = org.eclipse.jetty
+#logger.httpclient.level = DEBUG
\ No newline at end of file


[2/2] camel git commit: CAMEL-10238: Updated component docs

Posted by dh...@apache.org.
CAMEL-10238: Updated component docs


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0b15168c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0b15168c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0b15168c

Branch: refs/heads/master
Commit: 0b15168cd4747f7f1de8fb0f8e2230a1fe8ca4ea
Parents: 7d97e5b
Author: Dhiraj Bokde <dh...@yahoo.com>
Authored: Mon Aug 29 22:33:26 2016 -0700
Committer: Dhiraj Bokde <dh...@yahoo.com>
Committed: Mon Aug 29 22:33:26 2016 -0700

----------------------------------------------------------------------
 .../SalesforceComponentConfiguration.java       | 26 ++++++++++++++++++++
 .../src/main/docs/salesforce-component.adoc     |  8 ++++--
 2 files changed, 32 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0b15168c/components-starter/camel-salesforce-starter/src/main/java/org/apache/camel/component/salesforce/springboot/SalesforceComponentConfiguration.java
----------------------------------------------------------------------
diff --git a/components-starter/camel-salesforce-starter/src/main/java/org/apache/camel/component/salesforce/springboot/SalesforceComponentConfiguration.java b/components-starter/camel-salesforce-starter/src/main/java/org/apache/camel/component/salesforce/springboot/SalesforceComponentConfiguration.java
index a971286..d27021a 100644
--- a/components-starter/camel-salesforce-starter/src/main/java/org/apache/camel/component/salesforce/springboot/SalesforceComponentConfiguration.java
+++ b/components-starter/camel-salesforce-starter/src/main/java/org/apache/camel/component/salesforce/springboot/SalesforceComponentConfiguration.java
@@ -269,6 +269,16 @@ public class SalesforceComponentConfiguration {
      * Replay IDs to start from per channel name.
      */
     private Map<String, Integer> initialReplayIdMap;
+    /**
+     * Backoff interval increment for Streaming connection restart attempts for
+     * failures beyond CometD auto-reconnect.
+     */
+    private long backoffIncrement;
+    /**
+     * Maximum backoff interval for Streaming connection restart attempts for
+     * failures beyond CometD auto-reconnect.
+     */
+    private long maxBackoff;
 
     public SalesforceLoginConfig getLoginConfig() {
         return loginConfig;
@@ -713,4 +723,20 @@ public class SalesforceComponentConfiguration {
     public void setInitialReplayIdMap(Map<String, Integer> initialReplayIdMap) {
         this.initialReplayIdMap = initialReplayIdMap;
     }
+
+    public long getBackoffIncrement() {
+        return backoffIncrement;
+    }
+
+    public void setBackoffIncrement(long backoffIncrement) {
+        this.backoffIncrement = backoffIncrement;
+    }
+
+    public long getMaxBackoff() {
+        return maxBackoff;
+    }
+
+    public void setMaxBackoff(long maxBackoff) {
+        this.maxBackoff = maxBackoff;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/0b15168c/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc b/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc
index 8f1a73d..1e2e892 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc
@@ -205,7 +205,7 @@ Options
 
 
 // component options: START
-The Salesforce component supports 55 options which are listed below.
+The Salesforce component supports 57 options which are listed below.
 
 
 
@@ -268,6 +268,8 @@ The Salesforce component supports 55 options which are listed below.
 | objectMapper | ObjectMapper | Custom Jackson ObjectMapper to use when serializing/deserializing Salesforce objects.
 | defaultReplayId | Integer | Default replayId setting if no value is found in link initialReplayIdMap
 | initialReplayIdMap | Map | Replay IDs to start from per channel name.
+| backoffIncrement | long | Backoff interval increment for Streaming connection restart attempts for failures beyond CometD auto-reconnect.
+| maxBackoff | long | Maximum backoff interval for Streaming connection restart attempts for failures beyond CometD auto-reconnect.
 |=======================================================================
 {% endraw %}
 // component options: END
@@ -282,7 +284,7 @@ The Salesforce component supports 55 options which are listed below.
 
 
 // endpoint options: START
-The Salesforce component supports 39 endpoint options which are listed below:
+The Salesforce component supports 41 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2,1,1m,1m,5",options="header"]
@@ -294,6 +296,7 @@ The Salesforce component supports 39 endpoint options which are listed below:
 | apexQueryParams | common |  | Map | Query params for APEX method
 | apexUrl | common |  | String | APEX method URL
 | apiVersion | common |  | String | Salesforce API version defaults to SalesforceEndpointConfig.DEFAULT_VERSION
+| backoffIncrement | common |  | long | Backoff interval increment for Streaming connection restart attempts for failures beyond CometD auto-reconnect.
 | batchId | common |  | String | Bulk API Batch ID
 | contentType | common |  | ContentType | Bulk API content type one of XML CSV ZIP_XML ZIP_CSV
 | defaultReplayId | common |  | Integer | Default replayId setting if no value is found in link initialReplayIdMap
@@ -303,6 +306,7 @@ The Salesforce component supports 39 endpoint options which are listed below:
 | initialReplayIdMap | common |  | Map | Replay IDs to start from per channel name.
 | instanceId | common |  | String | Salesforce1 Analytics report execution instance ID
 | jobId | common |  | String | Bulk API Job ID
+| maxBackoff | common |  | long | Maximum backoff interval for Streaming connection restart attempts for failures beyond CometD auto-reconnect.
 | notifyForFields | common |  | NotifyForFieldsEnum | Notify for fields options are ALL REFERENCED SELECT WHERE
 | notifyForOperationCreate | common |  | Boolean | Notify for create operation defaults to false (API version = 29.0)
 | notifyForOperationDelete | common |  | Boolean | Notify for delete operation defaults to false (API version = 29.0)