You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/02/22 10:57:23 UTC
svn commit: r1073278 - in
/camel/trunk/camel-core/src/main/java/org/apache/camel: model/
model/config/ processor/ processor/resequencer/
Author: davsclaus
Date: Tue Feb 22 09:57:22 2011
New Revision: 1073278
URL: http://svn.apache.org/viewvc?rev=1073278&view=rev
Log:
CAMEL-3696: Removed list of Expression for Resequencer EIP as it was not support and useable anyway. Only 1 expression is needed.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/DefaultExchangeComparator.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ExpressionResultComparator.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=1073278&r1=1073277&r2=1073278&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java Tue Feb 22 09:57:22 2011
@@ -1621,8 +1621,7 @@ public abstract class ProcessorDefinitio
ResequenceDefinition answer = new ResequenceDefinition();
addOutput(answer);
ExpressionClause<ResequenceDefinition> clause = new ExpressionClause<ResequenceDefinition>(answer);
- answer.expression(clause);
- return clause;
+ return ExpressionClause.createAndSetExpression(answer);
}
/**
@@ -1633,36 +1632,13 @@ public abstract class ProcessorDefinitio
* @return the builder
*/
public ResequenceDefinition resequence(Expression expression) {
- return resequence(Collections.<Expression>singletonList(expression));
- }
-
- /**
- * <a href="http://camel.apache.org/resequencer.html">Resequencer EIP:</a>
- * Creates a resequencer allowing you to reorganize messages based on some comparator.
- *
- * @param expressions the list of expressions on which to compare messages in order
- * @return the builder
- */
- public ResequenceDefinition resequence(List<Expression> expressions) {
- ResequenceDefinition answer = new ResequenceDefinition(expressions);
+ ResequenceDefinition answer = new ResequenceDefinition();
+ answer.setExpression(new ExpressionDefinition(expression));
addOutput(answer);
return answer;
}
/**
- * <a href="http://camel.apache.org/resequencer.html">Resequencer EIP:</a>
- * Creates a splitter allowing you to reorganise messages based on some comparator.
- *
- * @param expressions the list of expressions on which to compare messages in order
- * @return the builder
- */
- public ResequenceDefinition resequence(Expression... expressions) {
- List<Expression> list = new ArrayList<Expression>();
- list.addAll(Arrays.asList(expressions));
- return resequence(list);
- }
-
- /**
* <a href="http://camel.apache.org/aggregator.html">Aggregator EIP:</a>
* Creates an aggregator allowing you to combine a number of messages together into a single message.
*
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java?rev=1073278&r1=1073277&r2=1073278&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java Tue Feb 22 09:57:22 2011
@@ -16,23 +16,20 @@
*/
package org.apache.camel.model;
-import java.util.ArrayList;
-import java.util.List;
-
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
-import javax.xml.bind.annotation.XmlElementRef;
import javax.xml.bind.annotation.XmlRootElement;
-import javax.xml.bind.annotation.XmlTransient;
import org.apache.camel.Expression;
import org.apache.camel.Processor;
import org.apache.camel.model.config.BatchResequencerConfig;
import org.apache.camel.model.config.StreamResequencerConfig;
-import org.apache.camel.model.language.ExpressionDefinition;
import org.apache.camel.processor.Resequencer;
import org.apache.camel.processor.StreamResequencer;
import org.apache.camel.processor.resequencer.ExpressionResultComparator;
import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.ObjectHelper;
/**
* Represents an XML <resequence/> element
@@ -40,32 +37,21 @@ import org.apache.camel.spi.RouteContext
* @version
*/
@XmlRootElement(name = "resequence")
-public class ResequenceDefinition extends ProcessorDefinition<ResequenceDefinition> {
- @XmlElementRef
- private List<ExpressionDefinition> expressions = new ArrayList<ExpressionDefinition>();
- @XmlElementRef
- private List<ProcessorDefinition> outputs = new ArrayList<ProcessorDefinition>();
- // Binding annotation at setter
+@XmlAccessorType(XmlAccessType.FIELD)
+public class ResequenceDefinition extends ExpressionNode {
+ @XmlElement(name = "batch-config", required = false)
private BatchResequencerConfig batchConfig;
- // Binding annotation at setter
+ @XmlElement(name = "stream-config", required = false)
private StreamResequencerConfig streamConfig;
- @XmlTransient
- private List<Expression> expressionList;
public ResequenceDefinition() {
- this(null);
- }
-
- public ResequenceDefinition(List<Expression> expressions) {
- this.expressionList = expressions;
- this.batch();
}
@Override
public String getShortName() {
return "resequence";
}
-
+
// Fluent API
// -------------------------------------------------------------------------
/**
@@ -115,17 +101,6 @@ public class ResequenceDefinition extend
}
/**
- * Sets the expression to use for reordering
- *
- * @param expression the expression
- * @return the builder
- */
- public ResequenceDefinition expression(ExpressionDefinition expression) {
- expressions.add(expression);
- return this;
- }
-
- /**
* Sets the timeout
* @param timeout timeout in millis
* @return the builder
@@ -145,9 +120,13 @@ public class ResequenceDefinition extend
* @return the builder
*/
public ResequenceDefinition size(int batchSize) {
- if (batchConfig == null) {
+ if (streamConfig != null) {
throw new IllegalStateException("size() only supported for batch resequencer");
}
+ // initialize batch mode as its default mode
+ if (batchConfig == null) {
+ batch();
+ }
batchConfig.setBatchSize(batchSize);
return this;
}
@@ -172,9 +151,13 @@ public class ResequenceDefinition extend
* @return the builder
*/
public ResequenceDefinition allowDuplicates() {
- if (batchConfig == null) {
+ if (streamConfig != null) {
throw new IllegalStateException("allowDuplicates() only supported for batch resequencer");
}
+ // initialize batch mode as its default mode
+ if (batchConfig == null) {
+ batch();
+ }
batchConfig.setAllowDuplicates(true);
return this;
}
@@ -188,9 +171,13 @@ public class ResequenceDefinition extend
* @return the builder
*/
public ResequenceDefinition reverse() {
- if (batchConfig == null) {
+ if (streamConfig != null) {
throw new IllegalStateException("reverse() only supported for batch resequencer");
}
+ // initialize batch mode as its default mode
+ if (batchConfig == null) {
+ batch();
+ }
batchConfig.setReverse(true);
return this;
}
@@ -211,65 +198,45 @@ public class ResequenceDefinition extend
@Override
public String toString() {
- return "Resequencer[" + getExpressions() + " -> " + getOutputs() + "]";
+ return "Resequencer[" + getExpression() + " -> " + getOutputs() + "]";
}
@Override
public String getLabel() {
- return ExpressionDefinition.getLabel(getExpressions());
- }
-
- public List<ExpressionDefinition> getExpressions() {
- return expressions;
- }
-
- public List<Expression> getExpressionList() {
- return expressionList;
- }
-
- public List<ProcessorDefinition> getOutputs() {
- return outputs;
- }
-
- public void setOutputs(List<ProcessorDefinition> outputs) {
- this.outputs = outputs;
+ return "Resequencer[" + super.getLabel() + "]";
}
public BatchResequencerConfig getBatchConfig() {
return batchConfig;
}
- public BatchResequencerConfig getBatchConfig(BatchResequencerConfig defaultConfig) {
- return batchConfig;
- }
-
public StreamResequencerConfig getStreamConfig() {
return streamConfig;
}
- @XmlElement(name = "batch-config", required = false)
public void setBatchConfig(BatchResequencerConfig batchConfig) {
- batch(batchConfig);
+ this.batchConfig = batchConfig;
}
- @XmlElement(name = "stream-config", required = false)
public void setStreamConfig(StreamResequencerConfig streamConfig) {
- stream(streamConfig);
+ this.streamConfig = streamConfig;
}
@Override
public Processor createProcessor(RouteContext routeContext) throws Exception {
- if (batchConfig != null) {
- return createBatchResequencer(routeContext, batchConfig);
- } else {
- // streamConfig should be non-null if batchConfig is null
+ if (streamConfig != null) {
return createStreamResequencer(routeContext, streamConfig);
+ } else {
+ if (batchConfig == null) {
+ // default as batch mode
+ batch();
+ }
+ return createBatchResequencer(routeContext, batchConfig);
}
}
/**
- * Creates a batch {@link Resequencer} instance applying the given
- * <code>config</code>.
+ * Creates a batch {@link Resequencer} instance applying the given <code>config</code>.
*
* @param routeContext route context.
* @param config batch resequencer configuration.
@@ -277,10 +244,14 @@ public class ResequenceDefinition extend
* @throws Exception can be thrown
*/
protected Resequencer createBatchResequencer(RouteContext routeContext,
- BatchResequencerConfig config) throws Exception {
-
+ BatchResequencerConfig config) throws Exception {
Processor processor = this.createChildProcessor(routeContext, true);
- Resequencer resequencer = new Resequencer(routeContext.getCamelContext(), processor, resolveExpressionList(routeContext),
+ Expression expression = getExpression().createExpression(routeContext);
+
+ ObjectHelper.notNull(config, "config", this);
+ ObjectHelper.notNull(expression, "expression", this);
+
+ Resequencer resequencer = new Resequencer(routeContext.getCamelContext(), processor, expression,
config.isAllowDuplicates(), config.isReverse());
resequencer.setBatchSize(config.getBatchSize());
resequencer.setBatchTimeout(config.getBatchTimeout());
@@ -288,36 +259,28 @@ public class ResequenceDefinition extend
}
/**
- * Creates a {@link StreamResequencer} instance applying the given
- * <code>config</code>.
+ * Creates a {@link StreamResequencer} instance applying the given <code>config</code>.
*
* @param routeContext route context.
* @param config stream resequencer configuration.
* @return the configured stream resequencer.
* @throws Exception can be thrwon
*/
- protected StreamResequencer createStreamResequencer(RouteContext routeContext,
- StreamResequencerConfig config) throws Exception {
-
- config.getComparator().setExpressions(resolveExpressionList(routeContext));
+ protected StreamResequencer createStreamResequencer(RouteContext routeContext,
+ StreamResequencerConfig config) throws Exception {
Processor processor = this.createChildProcessor(routeContext, true);
- StreamResequencer resequencer = new StreamResequencer(routeContext.getCamelContext(), processor, config.getComparator());
+ Expression expression = getExpression().createExpression(routeContext);
+
+ ObjectHelper.notNull(config, "config", this);
+ ObjectHelper.notNull(expression, "expression", this);
+
+ ExpressionResultComparator comparator = config.getComparator();
+ comparator.setExpression(expression);
+
+ StreamResequencer resequencer = new StreamResequencer(routeContext.getCamelContext(), processor, comparator);
resequencer.setTimeout(config.getTimeout());
resequencer.setCapacity(config.getCapacity());
return resequencer;
-
}
- private List<Expression> resolveExpressionList(RouteContext routeContext) {
- if (expressionList == null) {
- expressionList = new ArrayList<Expression>();
- for (ExpressionDefinition expression : expressions) {
- expressionList.add(expression.createExpression(routeContext));
- }
- }
- if (expressionList.isEmpty()) {
- throw new IllegalArgumentException("No expressions configured for: " + this);
- }
- return expressionList;
- }
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java?rev=1073278&r1=1073277&r2=1073278&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java Tue Feb 22 09:57:22 2011
@@ -24,46 +24,17 @@ import javax.xml.bind.annotation.XmlRoot
/**
* Defines the configuration parameters for the batch-processing
* {@link org.apache.camel.processor.Resequencer}. Usage example:
- *
- * <pre>
- * from("direct:start").resequence(body()).batch(
- * BatchResequencerConfig.getDefault()).to("mock:result")
- * </pre>
- * is equivalent to
- *
- * <pre>
- * from("direct:start").resequence(body()).batch().to("mock:result")
- * </pre>
- *
- * or
- *
- * <pre>
- * from("direct:start").resequence(body()).to("mock:result")
- * </pre>
- *
- * Custom values for <code>batchSize</code> and <code>batchTimeout</code>
- * can be set like in this example:
- *
- * <pre>
- * from("direct:start").resequence(body()).batch(
- * new BatchResequencerConfig(300, 400L)).to("mock:result")
- * </pre>
- *
- * @version
*/
-@XmlRootElement
+@XmlRootElement(name = "batch-config")
@XmlAccessorType(XmlAccessType.FIELD)
public class BatchResequencerConfig {
@XmlAttribute
- private Integer batchSize; // optional XML attribute requires wrapper object
-
+ private Integer batchSize;
@XmlAttribute
- private Long batchTimeout; // optional XML attribute requires wrapper object
-
+ private Long batchTimeout;
@XmlAttribute
private Boolean allowDuplicates;
-
@XmlAttribute
private Boolean reverse;
@@ -75,26 +46,24 @@ public class BatchResequencerConfig {
public BatchResequencerConfig() {
this(100, 1000L);
}
-
+
/**
* Creates a new {@link BatchResequencerConfig} instance using the given
* values for <code>batchSize</code> and <code>batchTimeout</code>.
- *
- * @param batchSize
- * size of the batch to be re-ordered.
- * @param batchTimeout
- * timeout for collecting elements to be re-ordered.
+ *
+ * @param batchSize size of the batch to be re-ordered.
+ * @param batchTimeout timeout for collecting elements to be re-ordered.
*/
public BatchResequencerConfig(int batchSize, long batchTimeout) {
this.batchSize = batchSize;
this.batchTimeout = batchTimeout;
}
-
+
/**
* Returns a new {@link BatchResequencerConfig} instance using default
* values for <code>batchSize</code> (100) and <code>batchTimeout</code>
* (1000L).
- *
+ *
* @return a default {@link BatchResequencerConfig}.
*/
public static BatchResequencerConfig getDefault() {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java?rev=1073278&r1=1073277&r2=1073278&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java Tue Feb 22 09:57:22 2011
@@ -27,39 +27,17 @@ import org.apache.camel.processor.resequ
/**
* Defines the configuration parameters for the {@link org.apache.camel.processor.StreamResequencer}.
- * Usage example:
- *
- * <pre>
- * from("direct:start").resequencer(header("seqnum")).stream(
- * StreamResequencerConfig.getDefault()).to("mock:result")
- * </pre>
- *
- * is equivalent to
- *
- * <pre>
- * from("direct:start").resequencer(header("seqnum")).stream().to("mock:result")
- * </pre>
- *
- * Custom values for <code>capacity</code> and <code>timeout</code> can be
- * set like in this example:
- *
- * <pre>
- * from("direct:start").resequencer(header("seqnum")).stream(
- * new StreamResequencerConfig(300, 400L)).to("mock:result")
- * </pre>
- *
+ *
* @version
*/
-@XmlRootElement
+@XmlRootElement(name = "stream-config")
@XmlAccessorType(XmlAccessType.FIELD)
public class StreamResequencerConfig {
@XmlAttribute
- private Integer capacity; // optional XML attribute requires wrapper object
-
+ private Integer capacity;
@XmlAttribute
- private Long timeout; // optional XML attribute requires wrapper object
-
+ private Long timeout;
@XmlTransient
private ExpressionResultComparator comparator;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java?rev=1073278&r1=1073277&r2=1073278&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java Tue Feb 22 09:57:22 2011
@@ -42,9 +42,9 @@ public class Resequencer extends BatchPr
this(camelContext, processor, createSet(expression, false, false));
}
- public Resequencer(CamelContext camelContext, Processor processor, List<Expression> expressions,
+ public Resequencer(CamelContext camelContext, Processor processor, Expression expression,
boolean allowDuplicates, boolean reverse) {
- this(camelContext, processor, createSet(expressions, allowDuplicates, reverse));
+ this(camelContext, processor, createSet(expression, allowDuplicates, reverse));
}
public Resequencer(CamelContext camelContext, Processor processor, Set<Exchange> collection) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java?rev=1073278&r1=1073277&r2=1073278&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java Tue Feb 22 09:57:22 2011
@@ -47,7 +47,7 @@ import org.apache.camel.util.ServiceHelp
* successor is known. For example a message with the sequence number 3 has a
* predecessor message with the sequence number 2 and a successor message with
* the sequence number 4. The message sequence 2,3,5 has a gap because the
- * sucessor of 3 is missing. The resequencer therefore has to retain message 5
+ * successor of 3 is missing. The resequencer therefore has to retain message 5
* until message 4 arrives (or a timeout occurs).
* <p>
* Instances of this class poll for {@link Exchange}s from a given
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/DefaultExchangeComparator.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/DefaultExchangeComparator.java?rev=1073278&r1=1073277&r2=1073278&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/DefaultExchangeComparator.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/DefaultExchangeComparator.java Tue Feb 22 09:57:22 2011
@@ -16,20 +16,13 @@
*/
package org.apache.camel.processor.resequencer;
-import java.util.List;
-
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
/**
* Compares elements of an {@link Exchange} sequence by comparing
* <code>long</code> values returned by this comaprator's
- * <code>expression</code>. The expression is set during route definition
- * e.g.
- *
- * <pre>
- * ...resequencer(header("seqnum")).stream()...
- * </pre>
+ * <code>expression</code>.
*
* @version
*/
@@ -37,23 +30,10 @@ public class DefaultExchangeComparator i
private Expression expression;
- public Expression getExpression() {
- return expression;
- }
-
public void setExpression(Expression expression) {
this.expression = expression;
}
- public void setExpressions(List<Expression> expressions) {
- if (expressions.isEmpty()) {
- throw new IllegalArgumentException("Expression required to resolve sequence number");
- } else if (expressions.size() > 1) {
- throw new IllegalArgumentException("More than one expression currently not supported");
- }
- expression = expressions.get(0);
- }
-
public boolean predecessor(Exchange o1, Exchange o2) {
long n1 = getSequenceNumber(o1);
long n2 = getSequenceNumber(o2);
@@ -75,5 +55,4 @@ public class DefaultExchangeComparator i
private long getSequenceNumber(Exchange exchange) {
return expression.evaluate(exchange, Long.class);
}
-
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ExpressionResultComparator.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ExpressionResultComparator.java?rev=1073278&r1=1073277&r2=1073278&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ExpressionResultComparator.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ExpressionResultComparator.java Tue Feb 22 09:57:22 2011
@@ -16,8 +16,6 @@
*/
package org.apache.camel.processor.resequencer;
-import java.util.List;
-
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
@@ -30,10 +28,10 @@ import org.apache.camel.Expression;
public interface ExpressionResultComparator extends SequenceElementComparator<Exchange> {
/**
- * Sets the list expressions used for comparing {@link Exchange}s.
+ * Set the expression sed for comparing {@link Exchange}s.
*
- * @param expressions a list of {@link Expression} objects.
+ * @param expression the expression
*/
- void setExpressions(List<Expression> expressions);
+ void setExpression(Expression expression);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java?rev=1073278&r1=1073277&r2=1073278&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java Tue Feb 22 09:57:22 2011
@@ -41,7 +41,7 @@ public interface SequenceElementComparat
*
* @param o1 a sequence element.
* @param o2 a sequence element.
- * @return true if its an immediate sucessor
+ * @return true if its an immediate successor
*/
boolean successor(E o1, E o2);