You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/02/06 17:42:27 UTC
[2/3] git commit: CAMEL-7112: Impl seda polling consumer to better
control taking from queue not using the other consumer which runs in the
background.
CAMEL-7112: Impl seda polling consumer to better control taking from queue not using the other consumer which runs in the background.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3cd84ad6
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3cd84ad6
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3cd84ad6
Branch: refs/heads/camel-2.12.x
Commit: 3cd84ad6eef7cd9c85b77629729bf99ccd4b02dd
Parents: ceb9cde
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Feb 6 17:40:58 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Feb 6 17:41:13 2014 +0100
----------------------------------------------------------------------
.../camel/component/seda/SedaEndpoint.java | 8 +++
.../component/seda/SedaPollingConsumer.java | 71 ++++++++++++++++++++
.../ConsumerTemplateSedaQueueIssueTest.java | 65 ++++++++++++++++++
3 files changed, 144 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/3cd84ad6/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
index f43027c..7ee8d03 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
@@ -29,6 +29,7 @@ import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.MultipleConsumersSupport;
+import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.WaitForTaskToComplete;
@@ -138,6 +139,13 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
return answer;
}
+ @Override
+ public PollingConsumer createPollingConsumer() throws Exception {
+ SedaPollingConsumer answer = new SedaPollingConsumer(this);
+ configureConsumer(answer);
+ return answer;
+ }
+
public synchronized BlockingQueue<Exchange> getQueue() {
if (queue == null) {
// prefer to lookup queue from component, so if this endpoint is re-created or re-started
http://git-wip-us.apache.org/repos/asf/camel/blob/3cd84ad6/camel-core/src/main/java/org/apache/camel/component/seda/SedaPollingConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaPollingConsumer.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaPollingConsumer.java
new file mode 100644
index 0000000..06bf3e9
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaPollingConsumer.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.PollingConsumerSupport;
+import org.apache.camel.util.ObjectHelper;
+
+public class SedaPollingConsumer extends PollingConsumerSupport {
+
+ public SedaPollingConsumer(Endpoint endpoint) {
+ super(endpoint);
+ }
+
+ @Override
+ public SedaEndpoint getEndpoint() {
+ return (SedaEndpoint) super.getEndpoint();
+ }
+
+ @Override
+ public Exchange receive() {
+ try {
+ return getEndpoint().getQueue().take();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ }
+
+ @Override
+ public Exchange receiveNoWait() {
+ return getEndpoint().getQueue().poll();
+ }
+
+ @Override
+ public Exchange receive(long timeout) {
+ try {
+ return getEndpoint().getQueue().poll(timeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ // noop
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ // noop
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/3cd84ad6/camel-core/src/test/java/org/apache/camel/issues/ConsumerTemplateSedaQueueIssueTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/issues/ConsumerTemplateSedaQueueIssueTest.java b/camel-core/src/test/java/org/apache/camel/issues/ConsumerTemplateSedaQueueIssueTest.java
new file mode 100644
index 0000000..fc9df60
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/issues/ConsumerTemplateSedaQueueIssueTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.seda.SedaEndpoint;
+
+public class ConsumerTemplateSedaQueueIssueTest extends ContextTestSupport {
+
+ public void testConsumerTemplateSedaQueue() throws Exception {
+ template.sendBody("direct:start", "A");
+ template.sendBody("direct:start", "B");
+ template.sendBody("direct:start", "C");
+ template.sendBody("direct:start", "D");
+ template.sendBody("direct:start", "E");
+
+ SedaEndpoint seda = context.getEndpoint("seda:foo", SedaEndpoint.class);
+ assertEquals(5, seda.getCurrentQueueSize());
+
+ String body = consumer.receiveBody(seda, 1000, String.class);
+ assertEquals("A", body);
+ assertEquals(4, seda.getCurrentQueueSize());
+
+ body = consumer.receiveBody(seda, 1000, String.class);
+ assertEquals("B", body);
+ assertEquals(3, seda.getCurrentQueueSize());
+
+ body = consumer.receiveBody(seda, 1000, String.class);
+ assertEquals("C", body);
+ assertEquals(2, seda.getCurrentQueueSize());
+
+ body = consumer.receiveBody(seda, 1000, String.class);
+ assertEquals("D", body);
+ assertEquals(1, seda.getCurrentQueueSize());
+
+ body = consumer.receiveBody(seda, 1000, String.class);
+ assertEquals("E", body);
+ assertEquals(0, seda.getCurrentQueueSize());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start").to("seda:foo");
+ }
+ };
+ }
+}