You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2008/09/28 10:51:40 UTC

svn commit: r699788 - in /activemq/camel/trunk/camel-core/src/main/java/org/apache/camel: model/ResequencerType.java processor/StreamResequencer.java

Author: davsclaus
Date: Sun Sep 28 01:51:40 2008
New Revision: 699788

URL: http://svn.apache.org/viewvc?rev=699788&view=rev
Log:
CAMEL-126: Applied patch no 3 with thanks to Martin Krasser

Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java?rev=699788&r1=699787&r2=699788&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java Sun Sep 28 01:51:40 2008
@@ -34,6 +34,7 @@
 import org.apache.camel.model.language.ExpressionType;
 import org.apache.camel.processor.Resequencer;
 import org.apache.camel.processor.StreamResequencer;
+import org.apache.camel.processor.resequencer.ExpressionResultComparator;
 import org.apache.camel.spi.RouteContext;
 
 /**
@@ -163,6 +164,41 @@
         stream(streamConfig);
     }
     
+    public ResequencerType timeout(long timeout) {
+        if (batchConfig != null) {
+            batchConfig.setBatchTimeout(timeout);
+        } else {
+            streamConfig.setTimeout(timeout);
+        }
+        return this;
+    }
+    
+    public ResequencerType size(int batchSize) {
+        if (batchConfig == null) {
+            throw new IllegalStateException("size() only supported for batch resequencer");
+        }
+        batchConfig.setBatchSize(batchSize);
+        return this;
+    }
+
+    public ResequencerType capacity(int capacity) {
+        if (streamConfig == null) {
+            throw new IllegalStateException("capacity() only supported for stream resequencer");
+        }
+        streamConfig.setCapacity(capacity);
+        return this;
+        
+    }
+    
+    public ResequencerType comparator(ExpressionResultComparator<Exchange> comparator) {
+        if (streamConfig == null) {
+            throw new IllegalStateException("comparator() only supported for stream resequencer");
+        }
+        streamConfig.setComparator(comparator);
+        return this;
+        
+    }
+    
     @Override
     public Processor createProcessor(RouteContext routeContext) throws Exception {
         if (batchConfig != null) {

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java?rev=699788&r1=699787&r2=699788&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java Sun Sep 28 01:51:40 2008
@@ -200,15 +200,7 @@
     }
 
     public void process(Exchange exchange) throws Exception {
-        if (engine.size() >= capacity) {
-            Thread.sleep(getTimeout());
-        } else {
-            if (exchange != null) {
-                engine.insert(exchange);
-            }
-        }
-        engine.deliver();
-        
+        // empty since exchanges come from endpoint's polling consumer
     }
 
 }
\ No newline at end of file