You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/08/02 06:49:23 UTC

[5/6] camel git commit: https://issues.apache.org/jira/browse/CAMEL-10215 Synchronizing receive methods

https://issues.apache.org/jira/browse/CAMEL-10215 Synchronizing receive methods


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1b25ee2e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1b25ee2e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1b25ee2e

Branch: refs/heads/camel-2.16.x
Commit: 1b25ee2eda44a1c753a5288f4491c7566c4ffdac
Parents: 7cdaf19
Author: Siddharth Sharma <si...@jobvite-inc.com>
Authored: Mon Aug 1 15:49:16 2016 -0700
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Aug 2 08:49:02 2016 +0200

----------------------------------------------------------------------
 .../camel/impl/EventDrivenPollingConsumer.java  | 40 +++++++++++---------
 1 file changed, 23 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1b25ee2e/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
index 624e9b2..117598f 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
@@ -114,14 +114,17 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement
         }
 
         while (isRunAllowed()) {
-            try {
-                beforePoll(0);
-                // take will block waiting for message
-                return queue.take();
-            } catch (InterruptedException e) {
-                handleInterruptedException(e);
-            } finally {
-                afterPoll();
+            // CAMEL-10215 - Synchronizing the ordering of beforePoll, poll and afterPoll as an atomic activity
+            synchronized(this) {
+                try {
+                    beforePoll(0);
+                    // take will block waiting for message
+                    return queue.take();
+                } catch (InterruptedException e) {
+                    handleInterruptedException(e);
+                } finally {
+                    afterPoll();
+                }
             }
         }
         LOG.trace("Consumer is not running, so returning null");
@@ -134,15 +137,18 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement
             throw new RejectedExecutionException(this + " is not started, but in state: " + getStatus().name());
         }
 
-        try {
-            // use the timeout value returned from beforePoll
-            timeout = beforePoll(timeout);
-            return queue.poll(timeout, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException e) {
-            handleInterruptedException(e);
-            return null;
-        } finally {
-            afterPoll();
+        // CAMEL-10215 - Synchronizing the ordering of beforePoll, poll and afterPoll as an atomic activity
+        synchronized(this) {
+            try {
+                // use the timeout value returned from beforePoll
+                timeout = beforePoll(timeout);
+                return queue.poll(timeout, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                handleInterruptedException(e);
+                return null;
+            } finally {
+                afterPoll();
+            }
         }
     }