You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/07/21 17:11:02 UTC
[1/2] camel git commit: CAMEL-8526: Add more EIP as specialized mbeans
Repository: camel
Updated Branches:
refs/heads/master 4de78cea6 -> edb9fd303
CAMEL-8526: Add more EIP as specialized mbeans
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3f3d9749
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3f3d9749
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3f3d9749
Branch: refs/heads/master
Commit: 3f3d974956fefe0ef163d49ce3f1d19f43af2e2b
Parents: 4de78ce
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Jul 21 16:59:47 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Jul 21 16:59:47 2015 +0200
----------------------------------------------------------------------
.../mbean/ManagedResequencerMBean.java | 47 ++++++++
.../DefaultManagementObjectStrategy.java | 7 ++
.../management/mbean/ManagedResequencer.java | 116 +++++++++++++++++++
.../camel/model/ConvertBodyDefinition.java | 2 -
.../org/apache/camel/model/ExpressionNode.java | 2 -
.../org/apache/camel/model/FromDefinition.java | 4 +-
.../apache/camel/model/ProcessDefinition.java | 2 -
.../camel/model/ResequenceDefinition.java | 8 +-
.../org/apache/camel/model/SendDefinition.java | 5 +-
.../apache/camel/model/SetHeaderDefinition.java | 2 -
.../camel/model/SetOutHeaderDefinition.java | 3 -
.../camel/model/SetPropertyDefinition.java | 2 -
.../model/language/ExpressionDefinition.java | 4 +-
.../apache/camel/processor/BatchProcessor.java | 24 ++++
.../camel/processor/StreamResequencer.java | 13 ++-
.../management/ManagedResequencerTest.java | 94 +++++++++++++++
16 files changed, 308 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedResequencerMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedResequencerMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedResequencerMBean.java
new file mode 100644
index 0000000..8eb9b4f
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedResequencerMBean.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.api.management.mbean;
+
+import org.apache.camel.api.management.ManagedAttribute;
+
+public interface ManagedResequencerMBean extends ManagedProcessorMBean {
+
+ @ManagedAttribute(description = "Expression to use for re-ordering the messages, such as a header with a sequence number")
+ String getExpression();
+
+ @ManagedAttribute(description = "The size of the batch to be re-ordered. The default size is 100.")
+ Integer getBatchSize();
+
+ @ManagedAttribute(description = "Minimum time to wait for missing elements (messages).")
+ Long getTimeout();
+
+ @ManagedAttribute(description = "Whether to allow duplicates.")
+ Boolean isAllowDuplicates();
+
+ @ManagedAttribute(description = "Whether to reverse the ordering.")
+ Boolean isReverse();
+
+ @ManagedAttribute(description = "Whether to ignore invalid exchanges")
+ Boolean isIgnoreInvalidExchanges();
+
+ @ManagedAttribute(description = "The capacity of the resequencer's inbound queue")
+ Integer getCapacity();
+
+ @ManagedAttribute(description = "If true, throws an exception when messages older than the last delivered message are processed")
+ Boolean isRejectOld();
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
index b00f746..661f914 100644
--- a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
@@ -52,6 +52,7 @@ import org.apache.camel.management.mbean.ManagedPollEnricher;
import org.apache.camel.management.mbean.ManagedProcessor;
import org.apache.camel.management.mbean.ManagedProducer;
import org.apache.camel.management.mbean.ManagedRecipientList;
+import org.apache.camel.management.mbean.ManagedResequencer;
import org.apache.camel.management.mbean.ManagedRoute;
import org.apache.camel.management.mbean.ManagedRoutingSlip;
import org.apache.camel.management.mbean.ManagedScheduledPollConsumer;
@@ -77,9 +78,11 @@ import org.apache.camel.processor.MulticastProcessor;
import org.apache.camel.processor.Pipeline;
import org.apache.camel.processor.PollEnricher;
import org.apache.camel.processor.RecipientList;
+import org.apache.camel.processor.Resequencer;
import org.apache.camel.processor.RoutingSlip;
import org.apache.camel.processor.SendDynamicProcessor;
import org.apache.camel.processor.SendProcessor;
+import org.apache.camel.processor.StreamResequencer;
import org.apache.camel.processor.Throttler;
import org.apache.camel.processor.ThroughputLogger;
import org.apache.camel.processor.WireTapProcessor;
@@ -226,6 +229,10 @@ public class DefaultManagementObjectStrategy implements ManagementObjectStrategy
answer = new ManagedRecipientList(context, (RecipientList) target, definition);
} else if (target instanceof MulticastProcessor) {
answer = new ManagedMulticast(context, (MulticastProcessor) target, definition);
+ } else if (target instanceof Resequencer) {
+ answer = new ManagedResequencer(context, (Resequencer) target, definition);
+ } else if (target instanceof StreamResequencer) {
+ answer = new ManagedResequencer(context, (StreamResequencer) target, definition);
} else if (target instanceof WireTapProcessor) {
answer = new ManagedWireTapProcessor(context, (WireTapProcessor) target, definition);
} else if (target instanceof SendDynamicProcessor) {
http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedResequencer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedResequencer.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedResequencer.java
new file mode 100644
index 0000000..a3429cc
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedResequencer.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management.mbean;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.api.management.mbean.ManagedResequencerMBean;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.processor.Resequencer;
+import org.apache.camel.processor.StreamResequencer;
+
+/**
+ * @version
+ */
+@ManagedResource(description = "Managed Resequencer")
+public class ManagedResequencer extends ManagedProcessor implements ManagedResequencerMBean {
+ private final Resequencer processor;
+ private final StreamResequencer streamProcessor;
+ private final String expression;
+
+ public ManagedResequencer(CamelContext context, Resequencer processor, ProcessorDefinition<?> definition) {
+ super(context, processor, definition);
+ this.processor = processor;
+ this.streamProcessor = null;
+ this.expression = processor.getExpression().toString();
+ }
+
+ public ManagedResequencer(CamelContext context, StreamResequencer processor, ProcessorDefinition<?> definition) {
+ super(context, processor, definition);
+ this.processor = null;
+ this.streamProcessor = processor;
+ this.expression = streamProcessor.getExpression().toString();
+ }
+
+ @Override
+ public String getExpression() {
+ return expression;
+ }
+
+ @Override
+ public Integer getBatchSize() {
+ if (processor != null) {
+ return processor.getBatchSize();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public Long getTimeout() {
+ if (processor != null) {
+ return processor.getBatchTimeout();
+ } else {
+ return streamProcessor.getTimeout();
+ }
+ }
+
+ @Override
+ public Boolean isAllowDuplicates() {
+ if (processor != null) {
+ return processor.isAllowDuplicates();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public Boolean isReverse() {
+ if (processor != null) {
+ return processor.isReverse();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public Boolean isIgnoreInvalidExchanges() {
+ if (processor != null) {
+ return processor.isIgnoreInvalidExchanges();
+ } else {
+ return streamProcessor.isIgnoreInvalidExchanges();
+ }
+ }
+
+ @Override
+ public Integer getCapacity() {
+ if (processor != null) {
+ return null;
+ } else {
+ return streamProcessor.getCapacity();
+ }
+ }
+
+ @Override
+ public Boolean isRejectOld() {
+ if (processor != null) {
+ return null;
+ } else {
+ return streamProcessor.isRejectOld();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/model/ConvertBodyDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ConvertBodyDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ConvertBodyDefinition.java
index a2fbac8..402790e 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ConvertBodyDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ConvertBodyDefinition.java
@@ -27,7 +27,6 @@ import javax.xml.bind.annotation.XmlTransient;
import org.apache.camel.Processor;
import org.apache.camel.processor.ConvertBodyProcessor;
import org.apache.camel.spi.Metadata;
-import org.apache.camel.spi.Required;
import org.apache.camel.spi.RouteContext;
/**
@@ -103,7 +102,6 @@ public class ConvertBodyDefinition extends NoOutputDefinition<ConvertBodyDefinit
/**
* The java type to convert to
*/
- @Required
public void setType(String type) {
this.type = type;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java b/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java
index 7dd1699..c8f1616 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java
@@ -28,7 +28,6 @@ import org.apache.camel.Processor;
import org.apache.camel.builder.ExpressionClause;
import org.apache.camel.model.language.ExpressionDefinition;
import org.apache.camel.processor.FilterProcessor;
-import org.apache.camel.spi.Required;
import org.apache.camel.spi.RouteContext;
/**
@@ -66,7 +65,6 @@ public abstract class ExpressionNode extends ProcessorDefinition<ExpressionNode>
return expression;
}
- @Required
public void setExpression(ExpressionDefinition expression) {
this.expression = expression;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/model/FromDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/FromDefinition.java b/camel-core/src/main/java/org/apache/camel/model/FromDefinition.java
index 3c1fc1f..bbb8428 100644
--- a/camel-core/src/main/java/org/apache/camel/model/FromDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/FromDefinition.java
@@ -24,7 +24,6 @@ import javax.xml.bind.annotation.XmlTransient;
import org.apache.camel.Endpoint;
import org.apache.camel.spi.Metadata;
-import org.apache.camel.spi.Required;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.util.ObjectHelper;
@@ -37,7 +36,7 @@ import org.apache.camel.util.ObjectHelper;
@XmlRootElement(name = "from")
@XmlAccessorType(XmlAccessType.FIELD)
public class FromDefinition extends OptionalIdentifiedDefinition<FromDefinition> implements EndpointRequiredDefinition {
- @XmlAttribute
+ @XmlAttribute @Metadata(required = "true")
private String uri;
@XmlAttribute
@Deprecated
@@ -96,7 +95,6 @@ public class FromDefinition extends OptionalIdentifiedDefinition<FromDefinition>
*
* @param uri the endpoint URI to use
*/
- @Required
public void setUri(String uri) {
clear();
this.uri = uri;
http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/model/ProcessDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessDefinition.java
index b5f9f44..7d2b818 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessDefinition.java
@@ -28,7 +28,6 @@ import org.apache.camel.Service;
import org.apache.camel.processor.DelegateAsyncProcessor;
import org.apache.camel.processor.DelegateSyncProcessor;
import org.apache.camel.spi.Metadata;
-import org.apache.camel.spi.Required;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.util.ObjectHelper;
@@ -84,7 +83,6 @@ public class ProcessDefinition extends NoOutputDefinition<ProcessDefinition> {
/**
* Reference to the {@link Processor} to lookup in the registry to use.
*/
- @Required
public void setRef(String ref) {
this.ref = ref;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
index f74abee..7f26a4f 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
@@ -37,7 +37,6 @@ import org.apache.camel.processor.Resequencer;
import org.apache.camel.processor.StreamResequencer;
import org.apache.camel.processor.resequencer.ExpressionResultComparator;
import org.apache.camel.spi.Metadata;
-import org.apache.camel.spi.Required;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.util.CamelContextHelper;
import org.apache.camel.util.ObjectHelper;
@@ -61,8 +60,7 @@ public class ResequenceDefinition extends ProcessorDefinition<ResequenceDefiniti
private BatchResequencerConfig batchConfig;
@XmlTransient
private StreamResequencerConfig streamConfig;
- @XmlElementRef
- @Required
+ @XmlElementRef @Metadata(required = "true")
private ExpressionDefinition expression;
@XmlElementRef
private List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>();
@@ -364,6 +362,8 @@ public class ResequenceDefinition extends ProcessorDefinition<ResequenceDefiniti
Resequencer resequencer = new Resequencer(routeContext.getCamelContext(), internal, expression, isAllowDuplicates, isReverse);
resequencer.setBatchSize(config.getBatchSize());
resequencer.setBatchTimeout(config.getBatchTimeout());
+ resequencer.setReverse(isReverse);
+ resequencer.setAllowDuplicates(isAllowDuplicates);
if (config.getIgnoreInvalidExchanges() != null) {
resequencer.setIgnoreInvalidExchanges(config.getIgnoreInvalidExchanges());
}
@@ -397,7 +397,7 @@ public class ResequenceDefinition extends ProcessorDefinition<ResequenceDefiniti
}
comparator.setExpression(expression);
- StreamResequencer resequencer = new StreamResequencer(routeContext.getCamelContext(), internal, comparator);
+ StreamResequencer resequencer = new StreamResequencer(routeContext.getCamelContext(), internal, comparator, expression);
resequencer.setTimeout(config.getTimeout());
resequencer.setCapacity(config.getCapacity());
resequencer.setRejectOld(config.getRejectOld());
http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/model/SendDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/SendDefinition.java b/camel-core/src/main/java/org/apache/camel/model/SendDefinition.java
index a3cbd57..83e5a70 100644
--- a/camel-core/src/main/java/org/apache/camel/model/SendDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/SendDefinition.java
@@ -25,7 +25,7 @@ import org.apache.camel.Endpoint;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.processor.SendProcessor;
-import org.apache.camel.spi.Required;
+import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.util.ObjectHelper;
@@ -36,7 +36,7 @@ import org.apache.camel.util.ObjectHelper;
*/
@XmlAccessorType(XmlAccessType.FIELD)
public abstract class SendDefinition<Type extends ProcessorDefinition<Type>> extends NoOutputDefinition<Type> implements EndpointRequiredDefinition {
- @XmlAttribute
+ @XmlAttribute @Metadata(required = "true")
protected String uri;
@XmlAttribute
@Deprecated
@@ -99,7 +99,6 @@ public abstract class SendDefinition<Type extends ProcessorDefinition<Type>> ext
*
* @param uri the uri of the endpoint
*/
- @Required
public void setUri(String uri) {
this.uri = uri;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/model/SetHeaderDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/SetHeaderDefinition.java b/camel-core/src/main/java/org/apache/camel/model/SetHeaderDefinition.java
index 13ebb47..6d1f794e 100644
--- a/camel-core/src/main/java/org/apache/camel/model/SetHeaderDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/SetHeaderDefinition.java
@@ -27,7 +27,6 @@ import org.apache.camel.builder.ExpressionBuilder;
import org.apache.camel.model.language.ExpressionDefinition;
import org.apache.camel.processor.SetHeaderProcessor;
import org.apache.camel.spi.Metadata;
-import org.apache.camel.spi.Required;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.util.ObjectHelper;
@@ -88,7 +87,6 @@ public class SetHeaderDefinition extends NoOutputExpressionNode {
/**
* Name of message header to set a new value
*/
- @Required
public void setHeaderName(String headerName) {
this.headerName = headerName;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/model/SetOutHeaderDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/SetOutHeaderDefinition.java b/camel-core/src/main/java/org/apache/camel/model/SetOutHeaderDefinition.java
index dc405f1..5493f08 100644
--- a/camel-core/src/main/java/org/apache/camel/model/SetOutHeaderDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/SetOutHeaderDefinition.java
@@ -26,7 +26,6 @@ import org.apache.camel.Processor;
import org.apache.camel.builder.ProcessorBuilder;
import org.apache.camel.model.language.ExpressionDefinition;
import org.apache.camel.spi.Metadata;
-import org.apache.camel.spi.Required;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.util.ObjectHelper;
@@ -40,7 +39,6 @@ import org.apache.camel.util.ObjectHelper;
@XmlAccessorType(XmlAccessType.FIELD)
@Deprecated
public class SetOutHeaderDefinition extends NoOutputExpressionNode {
- @Deprecated
@XmlAttribute(required = true)
private String headerName;
@@ -86,7 +84,6 @@ public class SetOutHeaderDefinition extends NoOutputExpressionNode {
/**
* Name of message header to set a new value
*/
- @Required
public void setHeaderName(String headerName) {
this.headerName = headerName;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/model/SetPropertyDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/SetPropertyDefinition.java b/camel-core/src/main/java/org/apache/camel/model/SetPropertyDefinition.java
index eccb3d2..a0c1376 100644
--- a/camel-core/src/main/java/org/apache/camel/model/SetPropertyDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/SetPropertyDefinition.java
@@ -27,7 +27,6 @@ import org.apache.camel.builder.ExpressionBuilder;
import org.apache.camel.model.language.ExpressionDefinition;
import org.apache.camel.processor.SetPropertyProcessor;
import org.apache.camel.spi.Metadata;
-import org.apache.camel.spi.Required;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.util.ObjectHelper;
@@ -88,7 +87,6 @@ public class SetPropertyDefinition extends NoOutputExpressionNode {
/**
* Name of exchange property to set a new value
*/
- @Required
public void setPropertyName(String propertyName) {
this.propertyName = propertyName;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/model/language/ExpressionDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/language/ExpressionDefinition.java b/camel-core/src/main/java/org/apache/camel/model/language/ExpressionDefinition.java
index bd3f4e9..cf8b1a5 100644
--- a/camel-core/src/main/java/org/apache/camel/model/language/ExpressionDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/language/ExpressionDefinition.java
@@ -37,7 +37,6 @@ import org.apache.camel.Predicate;
import org.apache.camel.model.OtherAttributesAware;
import org.apache.camel.spi.Language;
import org.apache.camel.spi.Metadata;
-import org.apache.camel.spi.Required;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.util.CollectionStringBuffer;
import org.apache.camel.util.ExpressionToPredicateAdapter;
@@ -56,7 +55,7 @@ public class ExpressionDefinition implements Expression, Predicate, OtherAttribu
@XmlAttribute
@XmlID
private String id;
- @XmlValue
+ @XmlValue @Metadata(required = "true")
private String expression;
@XmlAttribute @Metadata(defaultValue = "true")
private Boolean trim;
@@ -209,7 +208,6 @@ public class ExpressionDefinition implements Expression, Predicate, OtherAttribu
/**
* The expression value in your chosen language syntax
*/
- @Required
public void setExpression(String expression) {
this.expression = expression;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
index b3c8f05..797c89f 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
@@ -68,6 +68,8 @@ public class BatchProcessor extends ServiceSupport implements AsyncProcessor, Na
private boolean groupExchanges;
private boolean batchConsumer;
private boolean ignoreInvalidExchanges;
+ private boolean reverse;
+ private boolean allowDuplicates;
private Predicate completionPredicate;
private Expression expression;
@@ -100,6 +102,12 @@ public class BatchProcessor extends ServiceSupport implements AsyncProcessor, Na
// Properties
// -------------------------------------------------------------------------
+
+
+ public Expression getExpression() {
+ return expression;
+ }
+
public ExceptionHandler getExceptionHandler() {
return exceptionHandler;
}
@@ -176,6 +184,22 @@ public class BatchProcessor extends ServiceSupport implements AsyncProcessor, Na
this.ignoreInvalidExchanges = ignoreInvalidExchanges;
}
+ public boolean isReverse() {
+ return reverse;
+ }
+
+ public void setReverse(boolean reverse) {
+ this.reverse = reverse;
+ }
+
+ public boolean isAllowDuplicates() {
+ return allowDuplicates;
+ }
+
+ public void setAllowDuplicates(boolean allowDuplicates) {
+ this.allowDuplicates = allowDuplicates;
+ }
+
public Predicate getCompletionPredicate() {
return completionPredicate;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java b/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
index 71af68f..e24b056 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
@@ -28,6 +28,7 @@ import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
import org.apache.camel.Traceable;
@@ -76,6 +77,7 @@ public class StreamResequencer extends ServiceSupport implements SequenceSender<
private final ExceptionHandler exceptionHandler;
private final ResequencerEngine<Exchange> engine;
private final Processor processor;
+ private final Expression expression;
private Delivery delivery;
private int capacity;
private boolean ignoreInvalidExchanges;
@@ -86,15 +88,20 @@ public class StreamResequencer extends ServiceSupport implements SequenceSender<
* @param processor next processor that processes re-ordered exchanges.
* @param comparator a sequence element comparator for exchanges.
*/
- public StreamResequencer(CamelContext camelContext, Processor processor, SequenceElementComparator<Exchange> comparator) {
+ public StreamResequencer(CamelContext camelContext, Processor processor, SequenceElementComparator<Exchange> comparator, Expression expression) {
ObjectHelper.notNull(camelContext, "CamelContext");
this.camelContext = camelContext;
this.engine = new ResequencerEngine<Exchange>(comparator);
this.engine.setSequenceSender(this);
this.processor = processor;
+ this.expression = expression;
this.exceptionHandler = new LoggingExceptionHandler(camelContext, getClass());
}
+ public Expression getExpression() {
+ return expression;
+ }
+
/**
* Returns this resequencer's exception handler.
*/
@@ -150,6 +157,10 @@ public class StreamResequencer extends ServiceSupport implements SequenceSender<
engine.setRejectOld(rejectOld);
}
+ public boolean isRejectOld() {
+ return engine.getRejectOld() != null && engine.getRejectOld();
+ }
+
/**
* Sets whether to ignore invalid exchanges which cannot be used by this stream resequencer.
* <p/>
http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/test/java/org/apache/camel/management/ManagedResequencerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedResequencerTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedResequencerTest.java
new file mode 100644
index 0000000..8d5ce62
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedResequencerTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.openmbean.TabularData;
+
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version
+ */
+public class ManagedResequencerTest extends ManagementTestSupport {
+
+ public void testManageResequencer() throws Exception {
+ // JMX tests dont work well on AIX CI servers (hangs them)
+ if (isPlatform("aix")) {
+ return;
+ }
+
+ MockEndpoint foo = getMockEndpoint("mock:foo");
+ foo.expectedBodiesReceived("A", "B", "C");
+
+ template.sendBodyAndHeader("direct:start", "B", "num", "2");
+ template.sendBodyAndHeader("direct:start", "C", "num", "3");
+ template.sendBodyAndHeader("direct:start", "A", "num", "1");
+
+ assertMockEndpointsSatisfied();
+
+ // get the stats for the route
+ MBeanServer mbeanServer = getMBeanServer();
+
+ // get the object name for the delayer
+ ObjectName on = ObjectName.getInstance("org.apache.camel:context=camel-1,type=processors,name=\"mysend\"");
+
+ // should be on route1
+ String routeId = (String) mbeanServer.getAttribute(on, "RouteId");
+ assertEquals("route1", routeId);
+
+ String camelId = (String) mbeanServer.getAttribute(on, "CamelId");
+ assertEquals("camel-1", camelId);
+
+ String state = (String) mbeanServer.getAttribute(on, "State");
+ assertEquals(ServiceStatus.Started.name(), state);
+
+ String uri = (String) mbeanServer.getAttribute(on, "Expression");
+ assertEquals("header(num)", uri);
+
+ Integer size = (Integer) mbeanServer.getAttribute(on, "BatchSize");
+ assertEquals(3, size.intValue());
+
+ TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"});
+ assertNotNull(data);
+ assertEquals(2, data.size());
+
+ data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{true}, new String[]{"boolean"});
+ assertNotNull(data);
+ assertEquals(5, data.size());
+
+ String json = (String) mbeanServer.invoke(on, "informationJson", null, null);
+ assertNotNull(json);
+ assertTrue(json.contains("\"description\": \"Resequences (re-order) messages based on an expression"));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .resequence(header("num")).size(3).id("mysend")
+ .to("mock:foo");
+ }
+ };
+ }
+
+}
[2/2] camel git commit: CAMEL-8526: Add more EIP as specialized mbeans
Posted by da...@apache.org.
CAMEL-8526: Add more EIP as specialized mbeans
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/edb9fd30
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/edb9fd30
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/edb9fd30
Branch: refs/heads/master
Commit: edb9fd303831f9477dab5576cd463f56d633165c
Parents: 3f3d974
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Jul 21 17:16:41 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Jul 21 17:16:41 2015 +0200
----------------------------------------------------------------------
.../mbean/ManagedSamplingThrottlerMBean.java | 32 +++++++++++
.../DefaultManagementObjectStrategy.java | 4 ++
.../mbean/ManagedSamplingThrottler.java | 57 ++++++++++++++++++++
.../camel/processor/SamplingThrottler.java | 12 +++++
4 files changed, 105 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/edb9fd30/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSamplingThrottlerMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSamplingThrottlerMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSamplingThrottlerMBean.java
new file mode 100644
index 0000000..9b4b718
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSamplingThrottlerMBean.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.api.management.mbean;
+
+import org.apache.camel.api.management.ManagedAttribute;
+
+public interface ManagedSamplingThrottlerMBean extends ManagedProcessorMBean {
+
+ @ManagedAttribute(description = "The sample period during which only a single Exchange will pass through")
+ Long getSamplePeriod();
+
+ @ManagedAttribute(description = "The sample message count which only a single Exchange will pass through after this many received")
+ Long getMessageFrequency();
+
+ @ManagedAttribute(description = "The time units for the sample period")
+ String getTimeUnit();
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/edb9fd30/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
index 661f914..bc1686d 100644
--- a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
@@ -55,6 +55,7 @@ import org.apache.camel.management.mbean.ManagedRecipientList;
import org.apache.camel.management.mbean.ManagedResequencer;
import org.apache.camel.management.mbean.ManagedRoute;
import org.apache.camel.management.mbean.ManagedRoutingSlip;
+import org.apache.camel.management.mbean.ManagedSamplingThrottler;
import org.apache.camel.management.mbean.ManagedScheduledPollConsumer;
import org.apache.camel.management.mbean.ManagedSendDynamicProcessor;
import org.apache.camel.management.mbean.ManagedSendProcessor;
@@ -80,6 +81,7 @@ import org.apache.camel.processor.PollEnricher;
import org.apache.camel.processor.RecipientList;
import org.apache.camel.processor.Resequencer;
import org.apache.camel.processor.RoutingSlip;
+import org.apache.camel.processor.SamplingThrottler;
import org.apache.camel.processor.SendDynamicProcessor;
import org.apache.camel.processor.SendProcessor;
import org.apache.camel.processor.StreamResequencer;
@@ -229,6 +231,8 @@ public class DefaultManagementObjectStrategy implements ManagementObjectStrategy
answer = new ManagedRecipientList(context, (RecipientList) target, definition);
} else if (target instanceof MulticastProcessor) {
answer = new ManagedMulticast(context, (MulticastProcessor) target, definition);
+ } else if (target instanceof SamplingThrottler) {
+ answer = new ManagedSamplingThrottler(context, (SamplingThrottler) target, definition);
} else if (target instanceof Resequencer) {
answer = new ManagedResequencer(context, (Resequencer) target, definition);
} else if (target instanceof StreamResequencer) {
http://git-wip-us.apache.org/repos/asf/camel/blob/edb9fd30/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSamplingThrottler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSamplingThrottler.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSamplingThrottler.java
new file mode 100644
index 0000000..5e5cac8
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSamplingThrottler.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management.mbean;
+
+import java.util.Locale;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.api.management.mbean.ManagedSamplingThrottlerMBean;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.processor.SamplingThrottler;
+
+/**
+ * @version
+ */
+@ManagedResource(description = "Managed SamplingThrottler")
+public class ManagedSamplingThrottler extends ManagedProcessor implements ManagedSamplingThrottlerMBean {
+ private final SamplingThrottler processor;
+
+ public ManagedSamplingThrottler(CamelContext context, SamplingThrottler processor, ProcessorDefinition<?> definition) {
+ super(context, processor, definition);
+ this.processor = processor;
+ }
+
+ @Override
+ public Long getSamplePeriod() {
+ return processor.getSamplePeriod();
+ }
+
+ @Override
+ public Long getMessageFrequency() {
+ return processor.getMessageFrequency();
+ }
+
+ @Override
+ public String getTimeUnit() {
+ if (processor.getUnits() != null) {
+ return processor.getUnits().toString().toLowerCase(Locale.ENGLISH);
+ } else {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/edb9fd30/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java b/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java
index 661646c..9d6bbfe 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java
@@ -102,6 +102,18 @@ public class SamplingThrottler extends DelegateAsyncProcessor implements Traceab
}
}
+ public long getMessageFrequency() {
+ return messageFrequency;
+ }
+
+ public long getSamplePeriod() {
+ return samplePeriod;
+ }
+
+ public TimeUnit getUnits() {
+ return units;
+ }
+
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
boolean doSend = false;