You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2008/09/28 10:51:40 UTC
svn commit: r699788 - in
/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel:
model/ResequencerType.java processor/StreamResequencer.java
Author: davsclaus
Date: Sun Sep 28 01:51:40 2008
New Revision: 699788
URL: http://svn.apache.org/viewvc?rev=699788&view=rev
Log:
CAMEL-126: Applied patch no 3 with thanks to Martin Krasser
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java?rev=699788&r1=699787&r2=699788&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java Sun Sep 28 01:51:40 2008
@@ -34,6 +34,7 @@
import org.apache.camel.model.language.ExpressionType;
import org.apache.camel.processor.Resequencer;
import org.apache.camel.processor.StreamResequencer;
+import org.apache.camel.processor.resequencer.ExpressionResultComparator;
import org.apache.camel.spi.RouteContext;
/**
@@ -163,6 +164,41 @@
stream(streamConfig);
}
+ public ResequencerType timeout(long timeout) {
+ if (batchConfig != null) {
+ batchConfig.setBatchTimeout(timeout);
+ } else {
+ streamConfig.setTimeout(timeout);
+ }
+ return this;
+ }
+
+ public ResequencerType size(int batchSize) {
+ if (batchConfig == null) {
+ throw new IllegalStateException("size() only supported for batch resequencer");
+ }
+ batchConfig.setBatchSize(batchSize);
+ return this;
+ }
+
+ public ResequencerType capacity(int capacity) {
+ if (streamConfig == null) {
+ throw new IllegalStateException("capacity() only supported for stream resequencer");
+ }
+ streamConfig.setCapacity(capacity);
+ return this;
+
+ }
+
+ public ResequencerType comparator(ExpressionResultComparator<Exchange> comparator) {
+ if (streamConfig == null) {
+ throw new IllegalStateException("comparator() only supported for stream resequencer");
+ }
+ streamConfig.setComparator(comparator);
+ return this;
+
+ }
+
@Override
public Processor createProcessor(RouteContext routeContext) throws Exception {
if (batchConfig != null) {
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java?rev=699788&r1=699787&r2=699788&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java Sun Sep 28 01:51:40 2008
@@ -200,15 +200,7 @@
}
public void process(Exchange exchange) throws Exception {
- if (engine.size() >= capacity) {
- Thread.sleep(getTimeout());
- } else {
- if (exchange != null) {
- engine.insert(exchange);
- }
- }
- engine.deliver();
-
+ // empty since exchanges come from endpoint's polling consumer
}
}
\ No newline at end of file