You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/07/21 17:11:02 UTC

[1/2] camel git commit: CAMEL-8526: Add more EIP as specialized mbeans

Repository: camel
Updated Branches:
  refs/heads/master 4de78cea6 -> edb9fd303


CAMEL-8526: Add more EIP as specialized mbeans


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3f3d9749
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3f3d9749
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3f3d9749

Branch: refs/heads/master
Commit: 3f3d974956fefe0ef163d49ce3f1d19f43af2e2b
Parents: 4de78ce
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Jul 21 16:59:47 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Jul 21 16:59:47 2015 +0200

----------------------------------------------------------------------
 .../mbean/ManagedResequencerMBean.java          |  47 ++++++++
 .../DefaultManagementObjectStrategy.java        |   7 ++
 .../management/mbean/ManagedResequencer.java    | 116 +++++++++++++++++++
 .../camel/model/ConvertBodyDefinition.java      |   2 -
 .../org/apache/camel/model/ExpressionNode.java  |   2 -
 .../org/apache/camel/model/FromDefinition.java  |   4 +-
 .../apache/camel/model/ProcessDefinition.java   |   2 -
 .../camel/model/ResequenceDefinition.java       |   8 +-
 .../org/apache/camel/model/SendDefinition.java  |   5 +-
 .../apache/camel/model/SetHeaderDefinition.java |   2 -
 .../camel/model/SetOutHeaderDefinition.java     |   3 -
 .../camel/model/SetPropertyDefinition.java      |   2 -
 .../model/language/ExpressionDefinition.java    |   4 +-
 .../apache/camel/processor/BatchProcessor.java  |  24 ++++
 .../camel/processor/StreamResequencer.java      |  13 ++-
 .../management/ManagedResequencerTest.java      |  94 +++++++++++++++
 16 files changed, 308 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedResequencerMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedResequencerMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedResequencerMBean.java
new file mode 100644
index 0000000..8eb9b4f
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedResequencerMBean.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.api.management.mbean;
+
+import org.apache.camel.api.management.ManagedAttribute;
+
+public interface ManagedResequencerMBean extends ManagedProcessorMBean {
+
+    @ManagedAttribute(description = "Expression to use for re-ordering the messages, such as a header with a sequence number")
+    String getExpression();
+
+    @ManagedAttribute(description = "The size of the batch to be re-ordered. The default size is 100.")
+    Integer getBatchSize();
+
+    @ManagedAttribute(description = "Minimum time to wait for missing elements (messages).")
+    Long getTimeout();
+
+    @ManagedAttribute(description = "Whether to allow duplicates.")
+    Boolean isAllowDuplicates();
+
+    @ManagedAttribute(description = "Whether to reverse the ordering.")
+    Boolean isReverse();
+
+    @ManagedAttribute(description = "Whether to ignore invalid exchanges")
+    Boolean isIgnoreInvalidExchanges();
+
+    @ManagedAttribute(description = "The capacity of the resequencer's inbound queue")
+    Integer getCapacity();
+
+    @ManagedAttribute(description = "If true, throws an exception when messages older than the last delivered message are processed")
+    Boolean isRejectOld();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
index b00f746..661f914 100644
--- a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
@@ -52,6 +52,7 @@ import org.apache.camel.management.mbean.ManagedPollEnricher;
 import org.apache.camel.management.mbean.ManagedProcessor;
 import org.apache.camel.management.mbean.ManagedProducer;
 import org.apache.camel.management.mbean.ManagedRecipientList;
+import org.apache.camel.management.mbean.ManagedResequencer;
 import org.apache.camel.management.mbean.ManagedRoute;
 import org.apache.camel.management.mbean.ManagedRoutingSlip;
 import org.apache.camel.management.mbean.ManagedScheduledPollConsumer;
@@ -77,9 +78,11 @@ import org.apache.camel.processor.MulticastProcessor;
 import org.apache.camel.processor.Pipeline;
 import org.apache.camel.processor.PollEnricher;
 import org.apache.camel.processor.RecipientList;
+import org.apache.camel.processor.Resequencer;
 import org.apache.camel.processor.RoutingSlip;
 import org.apache.camel.processor.SendDynamicProcessor;
 import org.apache.camel.processor.SendProcessor;
+import org.apache.camel.processor.StreamResequencer;
 import org.apache.camel.processor.Throttler;
 import org.apache.camel.processor.ThroughputLogger;
 import org.apache.camel.processor.WireTapProcessor;
@@ -226,6 +229,10 @@ public class DefaultManagementObjectStrategy implements ManagementObjectStrategy
                 answer = new ManagedRecipientList(context, (RecipientList) target, definition);
             } else if (target instanceof MulticastProcessor) {
                 answer = new ManagedMulticast(context, (MulticastProcessor) target, definition);
+            } else if (target instanceof Resequencer) {
+                answer = new ManagedResequencer(context, (Resequencer) target, definition);
+            } else if (target instanceof StreamResequencer) {
+                answer = new ManagedResequencer(context, (StreamResequencer) target, definition);
             } else if (target instanceof WireTapProcessor) {
                 answer = new ManagedWireTapProcessor(context, (WireTapProcessor) target, definition);
             } else if (target instanceof SendDynamicProcessor) {

http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedResequencer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedResequencer.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedResequencer.java
new file mode 100644
index 0000000..a3429cc
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedResequencer.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management.mbean;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.api.management.mbean.ManagedResequencerMBean;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.processor.Resequencer;
+import org.apache.camel.processor.StreamResequencer;
+
+/**
+ * @version 
+ */
+@ManagedResource(description = "Managed Resequencer")
+public class ManagedResequencer extends ManagedProcessor implements ManagedResequencerMBean {
+    private final Resequencer processor;
+    private final StreamResequencer streamProcessor;
+    private final String expression;
+
+    public ManagedResequencer(CamelContext context, Resequencer processor, ProcessorDefinition<?> definition) {
+        super(context, processor, definition);
+        this.processor = processor;
+        this.streamProcessor = null;
+        this.expression = processor.getExpression().toString();
+    }
+
+    public ManagedResequencer(CamelContext context, StreamResequencer processor, ProcessorDefinition<?> definition) {
+        super(context, processor, definition);
+        this.processor = null;
+        this.streamProcessor = processor;
+        this.expression = streamProcessor.getExpression().toString();
+    }
+
+    @Override
+    public String getExpression() {
+        return expression;
+    }
+
+    @Override
+    public Integer getBatchSize() {
+        if (processor != null) {
+            return processor.getBatchSize();
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public Long getTimeout() {
+        if (processor != null) {
+            return processor.getBatchTimeout();
+        } else {
+            return streamProcessor.getTimeout();
+        }
+    }
+
+    @Override
+    public Boolean isAllowDuplicates() {
+        if (processor != null) {
+            return processor.isAllowDuplicates();
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public Boolean isReverse() {
+        if (processor != null) {
+            return processor.isReverse();
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public Boolean isIgnoreInvalidExchanges() {
+        if (processor != null) {
+            return processor.isIgnoreInvalidExchanges();
+        } else {
+            return streamProcessor.isIgnoreInvalidExchanges();
+        }
+    }
+
+    @Override
+    public Integer getCapacity() {
+        if (processor != null) {
+            return null;
+        } else {
+            return streamProcessor.getCapacity();
+        }
+    }
+
+    @Override
+    public Boolean isRejectOld() {
+        if (processor != null) {
+            return null;
+        } else {
+            return streamProcessor.isRejectOld();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/model/ConvertBodyDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ConvertBodyDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ConvertBodyDefinition.java
index a2fbac8..402790e 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ConvertBodyDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ConvertBodyDefinition.java
@@ -27,7 +27,6 @@ import javax.xml.bind.annotation.XmlTransient;
 import org.apache.camel.Processor;
 import org.apache.camel.processor.ConvertBodyProcessor;
 import org.apache.camel.spi.Metadata;
-import org.apache.camel.spi.Required;
 import org.apache.camel.spi.RouteContext;
 
 /**
@@ -103,7 +102,6 @@ public class ConvertBodyDefinition extends NoOutputDefinition<ConvertBodyDefinit
     /**
      * The java type to convert to
      */
-    @Required
     public void setType(String type) {
         this.type = type;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java b/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java
index 7dd1699..c8f1616 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java
@@ -28,7 +28,6 @@ import org.apache.camel.Processor;
 import org.apache.camel.builder.ExpressionClause;
 import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.processor.FilterProcessor;
-import org.apache.camel.spi.Required;
 import org.apache.camel.spi.RouteContext;
 
 /**
@@ -66,7 +65,6 @@ public abstract class ExpressionNode extends ProcessorDefinition<ExpressionNode>
         return expression;
     }
 
-    @Required
     public void setExpression(ExpressionDefinition expression) {
         this.expression = expression;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/model/FromDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/FromDefinition.java b/camel-core/src/main/java/org/apache/camel/model/FromDefinition.java
index 3c1fc1f..bbb8428 100644
--- a/camel-core/src/main/java/org/apache/camel/model/FromDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/FromDefinition.java
@@ -24,7 +24,6 @@ import javax.xml.bind.annotation.XmlTransient;
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.spi.Metadata;
-import org.apache.camel.spi.Required;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.ObjectHelper;
 
@@ -37,7 +36,7 @@ import org.apache.camel.util.ObjectHelper;
 @XmlRootElement(name = "from")
 @XmlAccessorType(XmlAccessType.FIELD)
 public class FromDefinition extends OptionalIdentifiedDefinition<FromDefinition> implements EndpointRequiredDefinition {
-    @XmlAttribute
+    @XmlAttribute @Metadata(required = "true")
     private String uri;
     @XmlAttribute
     @Deprecated
@@ -96,7 +95,6 @@ public class FromDefinition extends OptionalIdentifiedDefinition<FromDefinition>
      *
      * @param uri the endpoint URI to use
      */
-    @Required
     public void setUri(String uri) {
         clear();
         this.uri = uri;

http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/model/ProcessDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessDefinition.java
index b5f9f44..7d2b818 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessDefinition.java
@@ -28,7 +28,6 @@ import org.apache.camel.Service;
 import org.apache.camel.processor.DelegateAsyncProcessor;
 import org.apache.camel.processor.DelegateSyncProcessor;
 import org.apache.camel.spi.Metadata;
-import org.apache.camel.spi.Required;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.ObjectHelper;
 
@@ -84,7 +83,6 @@ public class ProcessDefinition extends NoOutputDefinition<ProcessDefinition> {
     /**
      * Reference to the {@link Processor} to lookup in the registry to use.
      */
-    @Required
     public void setRef(String ref) {
         this.ref = ref;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
index f74abee..7f26a4f 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
@@ -37,7 +37,6 @@ import org.apache.camel.processor.Resequencer;
 import org.apache.camel.processor.StreamResequencer;
 import org.apache.camel.processor.resequencer.ExpressionResultComparator;
 import org.apache.camel.spi.Metadata;
-import org.apache.camel.spi.Required;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.ObjectHelper;
@@ -61,8 +60,7 @@ public class ResequenceDefinition extends ProcessorDefinition<ResequenceDefiniti
     private BatchResequencerConfig batchConfig;
     @XmlTransient
     private StreamResequencerConfig streamConfig;
-    @XmlElementRef
-    @Required
+    @XmlElementRef @Metadata(required = "true")
     private ExpressionDefinition expression;
     @XmlElementRef
     private List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>();
@@ -364,6 +362,8 @@ public class ResequenceDefinition extends ProcessorDefinition<ResequenceDefiniti
         Resequencer resequencer = new Resequencer(routeContext.getCamelContext(), internal, expression, isAllowDuplicates, isReverse);
         resequencer.setBatchSize(config.getBatchSize());
         resequencer.setBatchTimeout(config.getBatchTimeout());
+        resequencer.setReverse(isReverse);
+        resequencer.setAllowDuplicates(isAllowDuplicates);
         if (config.getIgnoreInvalidExchanges() != null) {
             resequencer.setIgnoreInvalidExchanges(config.getIgnoreInvalidExchanges());
         }
@@ -397,7 +397,7 @@ public class ResequenceDefinition extends ProcessorDefinition<ResequenceDefiniti
         }
         comparator.setExpression(expression);
 
-        StreamResequencer resequencer = new StreamResequencer(routeContext.getCamelContext(), internal, comparator);
+        StreamResequencer resequencer = new StreamResequencer(routeContext.getCamelContext(), internal, comparator, expression);
         resequencer.setTimeout(config.getTimeout());
         resequencer.setCapacity(config.getCapacity());
         resequencer.setRejectOld(config.getRejectOld());

http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/model/SendDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/SendDefinition.java b/camel-core/src/main/java/org/apache/camel/model/SendDefinition.java
index a3cbd57..83e5a70 100644
--- a/camel-core/src/main/java/org/apache/camel/model/SendDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/SendDefinition.java
@@ -25,7 +25,7 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.processor.SendProcessor;
-import org.apache.camel.spi.Required;
+import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.ObjectHelper;
 
@@ -36,7 +36,7 @@ import org.apache.camel.util.ObjectHelper;
  */
 @XmlAccessorType(XmlAccessType.FIELD)
 public abstract class SendDefinition<Type extends ProcessorDefinition<Type>> extends NoOutputDefinition<Type> implements EndpointRequiredDefinition {
-    @XmlAttribute
+    @XmlAttribute @Metadata(required = "true")
     protected String uri;
     @XmlAttribute
     @Deprecated
@@ -99,7 +99,6 @@ public abstract class SendDefinition<Type extends ProcessorDefinition<Type>> ext
      *
      * @param uri the uri of the endpoint
      */
-    @Required
     public void setUri(String uri) {
         this.uri = uri;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/model/SetHeaderDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/SetHeaderDefinition.java b/camel-core/src/main/java/org/apache/camel/model/SetHeaderDefinition.java
index 13ebb47..6d1f794e 100644
--- a/camel-core/src/main/java/org/apache/camel/model/SetHeaderDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/SetHeaderDefinition.java
@@ -27,7 +27,6 @@ import org.apache.camel.builder.ExpressionBuilder;
 import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.processor.SetHeaderProcessor;
 import org.apache.camel.spi.Metadata;
-import org.apache.camel.spi.Required;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.ObjectHelper;
 
@@ -88,7 +87,6 @@ public class SetHeaderDefinition extends NoOutputExpressionNode {
     /**
      * Name of message header to set a new value
      */
-    @Required
     public void setHeaderName(String headerName) {
         this.headerName = headerName;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/model/SetOutHeaderDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/SetOutHeaderDefinition.java b/camel-core/src/main/java/org/apache/camel/model/SetOutHeaderDefinition.java
index dc405f1..5493f08 100644
--- a/camel-core/src/main/java/org/apache/camel/model/SetOutHeaderDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/SetOutHeaderDefinition.java
@@ -26,7 +26,6 @@ import org.apache.camel.Processor;
 import org.apache.camel.builder.ProcessorBuilder;
 import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.spi.Metadata;
-import org.apache.camel.spi.Required;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.ObjectHelper;
 
@@ -40,7 +39,6 @@ import org.apache.camel.util.ObjectHelper;
 @XmlAccessorType(XmlAccessType.FIELD)
 @Deprecated
 public class SetOutHeaderDefinition extends NoOutputExpressionNode {
-    @Deprecated
     @XmlAttribute(required = true)
     private String headerName;
     
@@ -86,7 +84,6 @@ public class SetOutHeaderDefinition extends NoOutputExpressionNode {
     /**
      * Name of message header to set a new value
      */
-    @Required
     public void setHeaderName(String headerName) {
         this.headerName = headerName;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/model/SetPropertyDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/SetPropertyDefinition.java b/camel-core/src/main/java/org/apache/camel/model/SetPropertyDefinition.java
index eccb3d2..a0c1376 100644
--- a/camel-core/src/main/java/org/apache/camel/model/SetPropertyDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/SetPropertyDefinition.java
@@ -27,7 +27,6 @@ import org.apache.camel.builder.ExpressionBuilder;
 import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.processor.SetPropertyProcessor;
 import org.apache.camel.spi.Metadata;
-import org.apache.camel.spi.Required;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.ObjectHelper;
 
@@ -88,7 +87,6 @@ public class SetPropertyDefinition extends NoOutputExpressionNode {
     /**
      * Name of exchange property to set a new value
      */
-    @Required
     public void setPropertyName(String propertyName) {
         this.propertyName = propertyName;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/model/language/ExpressionDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/language/ExpressionDefinition.java b/camel-core/src/main/java/org/apache/camel/model/language/ExpressionDefinition.java
index bd3f4e9..cf8b1a5 100644
--- a/camel-core/src/main/java/org/apache/camel/model/language/ExpressionDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/language/ExpressionDefinition.java
@@ -37,7 +37,6 @@ import org.apache.camel.Predicate;
 import org.apache.camel.model.OtherAttributesAware;
 import org.apache.camel.spi.Language;
 import org.apache.camel.spi.Metadata;
-import org.apache.camel.spi.Required;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.CollectionStringBuffer;
 import org.apache.camel.util.ExpressionToPredicateAdapter;
@@ -56,7 +55,7 @@ public class ExpressionDefinition implements Expression, Predicate, OtherAttribu
     @XmlAttribute
     @XmlID
     private String id;
-    @XmlValue
+    @XmlValue @Metadata(required = "true")
     private String expression;
     @XmlAttribute @Metadata(defaultValue = "true")
     private Boolean trim;
@@ -209,7 +208,6 @@ public class ExpressionDefinition implements Expression, Predicate, OtherAttribu
     /**
      * The expression value in your chosen language syntax
      */
-    @Required
     public void setExpression(String expression) {
         this.expression = expression;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
index b3c8f05..797c89f 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
@@ -68,6 +68,8 @@ public class BatchProcessor extends ServiceSupport implements AsyncProcessor, Na
     private boolean groupExchanges;
     private boolean batchConsumer;
     private boolean ignoreInvalidExchanges;
+    private boolean reverse;
+    private boolean allowDuplicates;
     private Predicate completionPredicate;
     private Expression expression;
 
@@ -100,6 +102,12 @@ public class BatchProcessor extends ServiceSupport implements AsyncProcessor, Na
 
     // Properties
     // -------------------------------------------------------------------------
+
+
+    public Expression getExpression() {
+        return expression;
+    }
+
     public ExceptionHandler getExceptionHandler() {
         return exceptionHandler;
     }
@@ -176,6 +184,22 @@ public class BatchProcessor extends ServiceSupport implements AsyncProcessor, Na
         this.ignoreInvalidExchanges = ignoreInvalidExchanges;
     }
 
+    public boolean isReverse() {
+        return reverse;
+    }
+
+    public void setReverse(boolean reverse) {
+        this.reverse = reverse;
+    }
+
+    public boolean isAllowDuplicates() {
+        return allowDuplicates;
+    }
+
+    public void setAllowDuplicates(boolean allowDuplicates) {
+        this.allowDuplicates = allowDuplicates;
+    }
+
     public Predicate getCompletionPredicate() {
         return completionPredicate;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java b/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
index 71af68f..e24b056 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
@@ -28,6 +28,7 @@ import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
 import org.apache.camel.Traceable;
@@ -76,6 +77,7 @@ public class StreamResequencer extends ServiceSupport implements SequenceSender<
     private final ExceptionHandler exceptionHandler;
     private final ResequencerEngine<Exchange> engine;
     private final Processor processor;
+    private final Expression expression;
     private Delivery delivery;
     private int capacity;
     private boolean ignoreInvalidExchanges;
@@ -86,15 +88,20 @@ public class StreamResequencer extends ServiceSupport implements SequenceSender<
      * @param processor next processor that processes re-ordered exchanges.
      * @param comparator a sequence element comparator for exchanges.
      */
-    public StreamResequencer(CamelContext camelContext, Processor processor, SequenceElementComparator<Exchange> comparator) {
+    public StreamResequencer(CamelContext camelContext, Processor processor, SequenceElementComparator<Exchange> comparator, Expression expression) {
         ObjectHelper.notNull(camelContext, "CamelContext");
         this.camelContext = camelContext;
         this.engine = new ResequencerEngine<Exchange>(comparator);
         this.engine.setSequenceSender(this);
         this.processor = processor;
+        this.expression = expression;
         this.exceptionHandler = new LoggingExceptionHandler(camelContext, getClass());
     }
 
+    public Expression getExpression() {
+        return expression;
+    }
+
     /**
      * Returns this resequencer's exception handler.
      */
@@ -150,6 +157,10 @@ public class StreamResequencer extends ServiceSupport implements SequenceSender<
         engine.setRejectOld(rejectOld);
     }
 
+    public boolean isRejectOld() {
+        return engine.getRejectOld() != null && engine.getRejectOld();
+    }
+
     /**
      * Sets whether to ignore invalid exchanges which cannot be used by this stream resequencer.
      * <p/>

http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/test/java/org/apache/camel/management/ManagedResequencerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedResequencerTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedResequencerTest.java
new file mode 100644
index 0000000..8d5ce62
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedResequencerTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.openmbean.TabularData;
+
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version 
+ */
+public class ManagedResequencerTest extends ManagementTestSupport {
+
+    public void testManageResequencer() throws Exception {
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        MockEndpoint foo = getMockEndpoint("mock:foo");
+        foo.expectedBodiesReceived("A", "B", "C");
+
+        template.sendBodyAndHeader("direct:start", "B", "num", "2");
+        template.sendBodyAndHeader("direct:start", "C", "num", "3");
+        template.sendBodyAndHeader("direct:start", "A", "num", "1");
+
+        assertMockEndpointsSatisfied();
+
+        // get the stats for the route
+        MBeanServer mbeanServer = getMBeanServer();
+
+        // get the object name for the delayer
+        ObjectName on = ObjectName.getInstance("org.apache.camel:context=camel-1,type=processors,name=\"mysend\"");
+
+        // should be on route1
+        String routeId = (String) mbeanServer.getAttribute(on, "RouteId");
+        assertEquals("route1", routeId);
+
+        String camelId = (String) mbeanServer.getAttribute(on, "CamelId");
+        assertEquals("camel-1", camelId);
+
+        String state = (String) mbeanServer.getAttribute(on, "State");
+        assertEquals(ServiceStatus.Started.name(), state);
+
+        String uri = (String) mbeanServer.getAttribute(on, "Expression");
+        assertEquals("header(num)", uri);
+
+        Integer size = (Integer) mbeanServer.getAttribute(on, "BatchSize");
+        assertEquals(3, size.intValue());
+
+        TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"});
+        assertNotNull(data);
+        assertEquals(2, data.size());
+
+        data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{true}, new String[]{"boolean"});
+        assertNotNull(data);
+        assertEquals(5, data.size());
+
+        String json = (String) mbeanServer.invoke(on, "informationJson", null, null);
+        assertNotNull(json);
+        assertTrue(json.contains("\"description\": \"Resequences (re-order) messages based on an expression"));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .resequence(header("num")).size(3).id("mysend")
+                        .to("mock:foo");
+            }
+        };
+    }
+
+}


[2/2] camel git commit: CAMEL-8526: Add more EIP as specialized mbeans

Posted by da...@apache.org.
CAMEL-8526: Add more EIP as specialized mbeans


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/edb9fd30
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/edb9fd30
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/edb9fd30

Branch: refs/heads/master
Commit: edb9fd303831f9477dab5576cd463f56d633165c
Parents: 3f3d974
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Jul 21 17:16:41 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Jul 21 17:16:41 2015 +0200

----------------------------------------------------------------------
 .../mbean/ManagedSamplingThrottlerMBean.java    | 32 +++++++++++
 .../DefaultManagementObjectStrategy.java        |  4 ++
 .../mbean/ManagedSamplingThrottler.java         | 57 ++++++++++++++++++++
 .../camel/processor/SamplingThrottler.java      | 12 +++++
 4 files changed, 105 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/edb9fd30/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSamplingThrottlerMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSamplingThrottlerMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSamplingThrottlerMBean.java
new file mode 100644
index 0000000..9b4b718
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSamplingThrottlerMBean.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.api.management.mbean;
+
+import org.apache.camel.api.management.ManagedAttribute;
+
+public interface ManagedSamplingThrottlerMBean extends ManagedProcessorMBean {
+
+    @ManagedAttribute(description = "The sample period during which only a single Exchange will pass through")
+    Long getSamplePeriod();
+
+    @ManagedAttribute(description = "The sample message count which only a single Exchange will pass through after this many received")
+    Long getMessageFrequency();
+
+    @ManagedAttribute(description = "The time units for the sample period")
+    String getTimeUnit();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/edb9fd30/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
index 661f914..bc1686d 100644
--- a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
@@ -55,6 +55,7 @@ import org.apache.camel.management.mbean.ManagedRecipientList;
 import org.apache.camel.management.mbean.ManagedResequencer;
 import org.apache.camel.management.mbean.ManagedRoute;
 import org.apache.camel.management.mbean.ManagedRoutingSlip;
+import org.apache.camel.management.mbean.ManagedSamplingThrottler;
 import org.apache.camel.management.mbean.ManagedScheduledPollConsumer;
 import org.apache.camel.management.mbean.ManagedSendDynamicProcessor;
 import org.apache.camel.management.mbean.ManagedSendProcessor;
@@ -80,6 +81,7 @@ import org.apache.camel.processor.PollEnricher;
 import org.apache.camel.processor.RecipientList;
 import org.apache.camel.processor.Resequencer;
 import org.apache.camel.processor.RoutingSlip;
+import org.apache.camel.processor.SamplingThrottler;
 import org.apache.camel.processor.SendDynamicProcessor;
 import org.apache.camel.processor.SendProcessor;
 import org.apache.camel.processor.StreamResequencer;
@@ -229,6 +231,8 @@ public class DefaultManagementObjectStrategy implements ManagementObjectStrategy
                 answer = new ManagedRecipientList(context, (RecipientList) target, definition);
             } else if (target instanceof MulticastProcessor) {
                 answer = new ManagedMulticast(context, (MulticastProcessor) target, definition);
+            } else if (target instanceof SamplingThrottler) {
+                answer = new ManagedSamplingThrottler(context, (SamplingThrottler) target, definition);
             } else if (target instanceof Resequencer) {
                 answer = new ManagedResequencer(context, (Resequencer) target, definition);
             } else if (target instanceof StreamResequencer) {

http://git-wip-us.apache.org/repos/asf/camel/blob/edb9fd30/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSamplingThrottler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSamplingThrottler.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSamplingThrottler.java
new file mode 100644
index 0000000..5e5cac8
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSamplingThrottler.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management.mbean;
+
+import java.util.Locale;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.api.management.mbean.ManagedSamplingThrottlerMBean;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.processor.SamplingThrottler;
+
+/**
+ * @version 
+ */
+@ManagedResource(description = "Managed SamplingThrottler")
+public class ManagedSamplingThrottler extends ManagedProcessor implements ManagedSamplingThrottlerMBean {
+    private final SamplingThrottler processor;
+
+    public ManagedSamplingThrottler(CamelContext context, SamplingThrottler processor, ProcessorDefinition<?> definition) {
+        super(context, processor, definition);
+        this.processor = processor;
+    }
+
+    @Override
+    public Long getSamplePeriod() {
+        return processor.getSamplePeriod();
+    }
+
+    @Override
+    public Long getMessageFrequency() {
+        return processor.getMessageFrequency();
+    }
+
+    @Override
+    public String getTimeUnit() {
+        if (processor.getUnits() != null) {
+            return processor.getUnits().toString().toLowerCase(Locale.ENGLISH);
+        } else {
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/edb9fd30/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java b/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java
index 661646c..9d6bbfe 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java
@@ -102,6 +102,18 @@ public class SamplingThrottler extends DelegateAsyncProcessor implements Traceab
         }
     }
 
+    public long getMessageFrequency() {
+        return messageFrequency;
+    }
+
+    public long getSamplePeriod() {
+        return samplePeriod;
+    }
+
+    public TimeUnit getUnits() {
+        return units;
+    }
+
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         boolean doSend = false;