You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/02/06 17:42:27 UTC

[2/3] git commit: CAMEL-7112: Impl seda polling consumer to better control taking from queue not using the other consumer which runs in the background.

CAMEL-7112: Impl seda polling consumer to better control taking from queue not using the other consumer which runs in the background.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3cd84ad6
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3cd84ad6
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3cd84ad6

Branch: refs/heads/camel-2.12.x
Commit: 3cd84ad6eef7cd9c85b77629729bf99ccd4b02dd
Parents: ceb9cde
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Feb 6 17:40:58 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Feb 6 17:41:13 2014 +0100

----------------------------------------------------------------------
 .../camel/component/seda/SedaEndpoint.java      |  8 +++
 .../component/seda/SedaPollingConsumer.java     | 71 ++++++++++++++++++++
 .../ConsumerTemplateSedaQueueIssueTest.java     | 65 ++++++++++++++++++
 3 files changed, 144 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3cd84ad6/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
index f43027c..7ee8d03 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
@@ -29,6 +29,7 @@ import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.MultipleConsumersSupport;
+import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.WaitForTaskToComplete;
@@ -138,6 +139,13 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
         return answer;
     }
 
+    @Override
+    public PollingConsumer createPollingConsumer() throws Exception {
+        SedaPollingConsumer answer = new SedaPollingConsumer(this);
+        configureConsumer(answer);
+        return answer;
+    }
+
     public synchronized BlockingQueue<Exchange> getQueue() {
         if (queue == null) {
             // prefer to lookup queue from component, so if this endpoint is re-created or re-started

http://git-wip-us.apache.org/repos/asf/camel/blob/3cd84ad6/camel-core/src/main/java/org/apache/camel/component/seda/SedaPollingConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaPollingConsumer.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaPollingConsumer.java
new file mode 100644
index 0000000..06bf3e9
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaPollingConsumer.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.PollingConsumerSupport;
+import org.apache.camel.util.ObjectHelper;
+
+public class SedaPollingConsumer extends PollingConsumerSupport {
+
+    public SedaPollingConsumer(Endpoint endpoint) {
+        super(endpoint);
+    }
+
+    @Override
+    public SedaEndpoint getEndpoint() {
+        return (SedaEndpoint) super.getEndpoint();
+    }
+
+    @Override
+    public Exchange receive() {
+        try {
+            return getEndpoint().getQueue().take();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+    }
+
+    @Override
+    public Exchange receiveNoWait() {
+        return getEndpoint().getQueue().poll();
+    }
+
+    @Override
+    public Exchange receive(long timeout) {
+        try {
+            return getEndpoint().getQueue().poll(timeout, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        // noop
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        // noop
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3cd84ad6/camel-core/src/test/java/org/apache/camel/issues/ConsumerTemplateSedaQueueIssueTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/issues/ConsumerTemplateSedaQueueIssueTest.java b/camel-core/src/test/java/org/apache/camel/issues/ConsumerTemplateSedaQueueIssueTest.java
new file mode 100644
index 0000000..fc9df60
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/issues/ConsumerTemplateSedaQueueIssueTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.seda.SedaEndpoint;
+
+public class ConsumerTemplateSedaQueueIssueTest extends ContextTestSupport {
+
+    public void testConsumerTemplateSedaQueue() throws Exception {
+        template.sendBody("direct:start", "A");
+        template.sendBody("direct:start", "B");
+        template.sendBody("direct:start", "C");
+        template.sendBody("direct:start", "D");
+        template.sendBody("direct:start", "E");
+
+        SedaEndpoint seda = context.getEndpoint("seda:foo", SedaEndpoint.class);
+        assertEquals(5, seda.getCurrentQueueSize());
+
+        String body = consumer.receiveBody(seda, 1000, String.class);
+        assertEquals("A", body);
+        assertEquals(4, seda.getCurrentQueueSize());
+
+        body = consumer.receiveBody(seda, 1000, String.class);
+        assertEquals("B", body);
+        assertEquals(3, seda.getCurrentQueueSize());
+
+        body = consumer.receiveBody(seda, 1000, String.class);
+        assertEquals("C", body);
+        assertEquals(2, seda.getCurrentQueueSize());
+
+        body = consumer.receiveBody(seda, 1000, String.class);
+        assertEquals("D", body);
+        assertEquals(1, seda.getCurrentQueueSize());
+
+        body = consumer.receiveBody(seda, 1000, String.class);
+        assertEquals("E", body);
+        assertEquals(0, seda.getCurrentQueueSize());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to("seda:foo");
+            }
+        };
+    }
+}