You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@camel.apache.org by Willem Jiang <wi...@gmail.com> on 2009/01/22 04:12:54 UTC

Re: svn commit: r736287 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/component/browse/ camel-core/src/main/java/org/apache/camel/model/ camel-core/src/main/java/org/apache/camel/model/loadbalancer/ camel-core/src/main/java/org/apache/camel/...

Hi Claus,

I'm sorry we need to revert this change because I did not take a good
look at the LoadBalancerConsumer.

For the browser endpoint , if there are more than one
from(BROWSER_ENDPOINT)... rule in the camel contextg, current change
doesn't work. In this case the last from(BROWSER_ENDPOINT) will be called.

LoadBalancerConusumer give us ability to set difference LoadBalancer on
the consumers of this endpoint.

>From this respect we can't remove the TopicLoadBalancer.

I will clean it up.

Willem

davsclaus@apache.org wrote:
> Author: davsclaus
> Date: Wed Jan 21 03:29:52 2009
> New Revision: 736287
> 
> URL: http://svn.apache.org/viewvc?rev=736287&view=rev
> Log:
> Removed @deprecated TopicLoadBalancer. Use Multicast instead.
> 
> Removed:
>     camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/TopicLoadBalanceStrategy.java
>     camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
> Modified:
>     camel/trunk/camel-core/src/main/java/org/apache/camel/component/browse/BrowseEndpoint.java
>     camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalanceType.java
>     camel/trunk/camel-core/src/main/resources/org/apache/camel/model/loadbalancer/jaxb.index
>     camel/trunk/components/camel-spring/src/main/java/org/apache/camel/component/event/EventEndpoint.java
>     camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/handler/CamelNamespaceHandler.java
> 
> Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/browse/BrowseEndpoint.java
> URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/browse/BrowseEndpoint.java?rev=736287&r1=736286&r2=736287&view=diff
> ==============================================================================
> --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/browse/BrowseEndpoint.java (original)
> +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/browse/BrowseEndpoint.java Wed Jan 21 03:29:52 2009
> @@ -16,8 +16,6 @@
>   */
>  package org.apache.camel.component.browse;
>  
> -import java.beans.PropertyChangeListener;
> -import java.beans.PropertyChangeSupport;
>  import java.util.List;
>  import java.util.concurrent.CopyOnWriteArrayList;
>  
> @@ -28,10 +26,9 @@
>  import org.apache.camel.Processor;
>  import org.apache.camel.Producer;
>  import org.apache.camel.Service;
> +import org.apache.camel.impl.DefaultConsumer;
>  import org.apache.camel.impl.DefaultEndpoint;
>  import org.apache.camel.impl.DefaultProducer;
> -import org.apache.camel.processor.loadbalancer.LoadBalancerConsumer;
> -import org.apache.camel.processor.loadbalancer.TopicLoadBalancer;
>  import org.apache.camel.spi.BrowsableEndpoint;
>  
>  /**
> @@ -42,9 +39,10 @@
>   */
>  public class BrowseEndpoint extends DefaultEndpoint implements BrowsableEndpoint, Service {
>      private List<Exchange> exchanges;
> -    private TopicLoadBalancer loadBalancer = new TopicLoadBalancer();
> -    // TODO: firing of property changes not implemented
> -    private PropertyChangeSupport propertyChangeSupport = new PropertyChangeSupport(this);
> +    private Processor processor;
> +
> +    public BrowseEndpoint() {
> +    }
>  
>      public BrowseEndpoint(String uri, CamelContext camelContext) {
>          super(uri, camelContext);
> @@ -66,18 +64,6 @@
>          return exchanges;
>      }
>  
> -    public TopicLoadBalancer getLoadBalancer() {
> -        return loadBalancer;
> -    }
> -
> -    public void addPropertyChangeListener(PropertyChangeListener listener) {
> -        propertyChangeSupport.addPropertyChangeListener(listener);
> -    }
> -
> -    public void removePropertyChangeListener(PropertyChangeListener listener) {
> -        propertyChangeSupport.removePropertyChangeListener(listener);
> -    }
> -
>      public Producer createProducer() throws Exception {
>          return new DefaultProducer(this) {
>              public void process(Exchange exchange) throws Exception {
> @@ -87,7 +73,8 @@
>      }
>  
>      public Consumer createConsumer(Processor processor) throws Exception {
> -        return new LoadBalancerConsumer(this, processor, loadBalancer);
> +        this.processor = processor;
> +        return new DefaultConsumer(this, processor);
>      }
>  
>      protected List<Exchange> createExchangeList() {
> @@ -101,10 +88,13 @@
>       * @throws Exception is thrown if failed to process the exchange
>       */
>      protected void onExchange(Exchange exchange) throws Exception {
> -        exchanges.add(exchange);
> +        // add a copy of the Exchange at the given time
> +        exchanges.add(exchange.copy());
>  
> -        // lets fire any consumers
> -        loadBalancer.process(exchange);
> +        // lets continue processing if there are any consumers
> +        if (processor != null) {
> +            processor.process(exchange);
> +        }
>      }
>  
>      public void start() throws Exception {
> 
> Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalanceType.java
> URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalanceType.java?rev=736287&r1=736286&r2=736287&view=diff
> ==============================================================================
> --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalanceType.java (original)
> +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalanceType.java Wed Jan 21 03:29:52 2009
> @@ -34,13 +34,11 @@
>  import org.apache.camel.model.loadbalancer.RandomLoadBalanceStrategy;
>  import org.apache.camel.model.loadbalancer.RoundRobinLoadBalanceStrategy;
>  import org.apache.camel.model.loadbalancer.StickyLoadBalanceStrategy;
> -import org.apache.camel.model.loadbalancer.TopicLoadBalanceStrategy;
>  import org.apache.camel.processor.SendProcessor;
>  import org.apache.camel.processor.loadbalancer.LoadBalancer;
>  import org.apache.camel.processor.loadbalancer.RandomLoadBalancer;
>  import org.apache.camel.processor.loadbalancer.RoundRobinLoadBalancer;
>  import org.apache.camel.processor.loadbalancer.StickyLoadBalancer;
> -import org.apache.camel.processor.loadbalancer.TopicLoadBalancer;
>  import org.apache.camel.spi.RouteContext;
>  import org.apache.camel.util.CollectionStringBuffer;
>  
> @@ -56,8 +54,7 @@
>      @XmlElements({
>          @XmlElement(required = false, name = "roundRobin", type = RoundRobinLoadBalanceStrategy.class),
>          @XmlElement(required = false, name = "random", type = RandomLoadBalanceStrategy.class),
> -        @XmlElement(required = false, name = "sticky", type = StickyLoadBalanceStrategy.class),
> -        @XmlElement(required = false, name = "topic", type = TopicLoadBalanceStrategy.class)}
> +        @XmlElement(required = false, name = "sticky", type = StickyLoadBalanceStrategy.class)}
>          )
>      private LoadBalancerType loadBalancerType;
>  
> @@ -176,16 +173,6 @@
>          return this;
>      }
>  
> -    /**
> -     * Uses topic load balancer
> -     * 
> -     * @return the builder
> -     */
> -    public LoadBalanceType topic() {
> -        loadBalancerType = new LoadBalancerType(new TopicLoadBalancer());
> -        return this;
> -    }
> -
>      @Override
>      public String getLabel() {
>          CollectionStringBuffer buffer = new CollectionStringBuffer();
> 
> Modified: camel/trunk/camel-core/src/main/resources/org/apache/camel/model/loadbalancer/jaxb.index
> URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/resources/org/apache/camel/model/loadbalancer/jaxb.index?rev=736287&r1=736286&r2=736287&view=diff
> ==============================================================================
> --- camel/trunk/camel-core/src/main/resources/org/apache/camel/model/loadbalancer/jaxb.index (original)
> +++ camel/trunk/camel-core/src/main/resources/org/apache/camel/model/loadbalancer/jaxb.index Wed Jan 21 03:29:52 2009
> @@ -17,5 +17,4 @@
>  LoadBalancerType
>  RandomLoadBalanceStrategy
>  RoundRobinLoadBalanceStrategy
> -StickyLoadBalanceStrategy
> -TopicLoadBalanceStrategy
> \ No newline at end of file
> +StickyLoadBalanceStrategy
> \ No newline at end of file
> 
> Modified: camel/trunk/components/camel-spring/src/main/java/org/apache/camel/component/event/EventEndpoint.java
> URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/component/event/EventEndpoint.java?rev=736287&r1=736286&r2=736287&view=diff
> ==============================================================================
> --- camel/trunk/components/camel-spring/src/main/java/org/apache/camel/component/event/EventEndpoint.java (original)
> +++ camel/trunk/components/camel-spring/src/main/java/org/apache/camel/component/event/EventEndpoint.java Wed Jan 21 03:29:52 2009
> @@ -16,22 +16,22 @@
>   */
>  package org.apache.camel.component.event;
>  
> +import java.util.ArrayList;
> +
>  import org.apache.camel.Exchange;
>  import org.apache.camel.NoTypeConversionAvailableException;
>  import org.apache.camel.Processor;
>  import org.apache.camel.Producer;
>  import org.apache.camel.impl.DefaultEndpoint;
>  import org.apache.camel.impl.DefaultProducer;
> -import org.apache.camel.processor.loadbalancer.LoadBalancer;
> -import org.apache.camel.processor.loadbalancer.TopicLoadBalancer;
> +import org.apache.camel.processor.MulticastProcessor;
>  import org.apache.camel.util.ObjectHelper;
> +import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException;
>  import org.springframework.beans.BeansException;
>  import org.springframework.context.ApplicationContext;
>  import org.springframework.context.ApplicationContextAware;
>  import org.springframework.context.ApplicationEvent;
>  
> -import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException;
> -
>  
>  /**
>   * An <a href="http://activemq.apache.org/camel/event.html">Event Endpoint</a>
> @@ -40,7 +40,7 @@
>   * @version $Revision$
>   */
>  public class EventEndpoint extends DefaultEndpoint implements ApplicationContextAware {
> -    private LoadBalancer loadBalancer;
> +    private MulticastProcessor processor;
>      private ApplicationContext applicationContext;
>  
>      public EventEndpoint(String endpointUri, EventComponent component) {
> @@ -82,35 +82,27 @@
>          Exchange exchange = createExchange();
>          exchange.getIn().setBody(event);
>          try {
> -            getLoadBalancer().process(exchange);
> +            getMulticastProcessor().process(exchange);
>          } catch (Exception e) {
>              throw wrapRuntimeCamelException(e);
>          }
>      }
>  
> -    public LoadBalancer getLoadBalancer() {
> -        if (loadBalancer == null) {
> -            loadBalancer = createLoadBalancer();
> +    protected synchronized MulticastProcessor getMulticastProcessor() {
> +        if (processor == null) {
> +            processor = new MulticastProcessor(new ArrayList<Processor>());
>          }
> -        return loadBalancer;
> -    }
> -
> -    public void setLoadBalancer(LoadBalancer loadBalancer) {
> -        this.loadBalancer = loadBalancer;
> +        return processor;
>      }
>  
>      // Implementation methods
>      // -------------------------------------------------------------------------
>      public synchronized void consumerStarted(EventConsumer consumer) {
> -        getLoadBalancer().addProcessor(consumer.getProcessor());
> +        getMulticastProcessor().getProcessors().add(consumer.getProcessor());
>      }
>  
>      public synchronized void consumerStopped(EventConsumer consumer) {
> -        getLoadBalancer().removeProcessor(consumer.getProcessor());
> -    }
> -
> -    protected LoadBalancer createLoadBalancer() {
> -        return new TopicLoadBalancer();
> +        getMulticastProcessor().getProcessors().remove(consumer.getProcessor());
>      }
>  
>      protected ApplicationEvent toApplicationEvent(Exchange exchange) {
> 
> Modified: camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/handler/CamelNamespaceHandler.java
> URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/handler/CamelNamespaceHandler.java?rev=736287&r1=736286&r2=736287&view=diff
> ==============================================================================
> --- camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/handler/CamelNamespaceHandler.java (original)
> +++ camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/handler/CamelNamespaceHandler.java Wed Jan 21 03:29:52 2009
> @@ -38,7 +38,6 @@
>  import org.apache.camel.model.loadbalancer.RandomLoadBalanceStrategy;
>  import org.apache.camel.model.loadbalancer.RoundRobinLoadBalanceStrategy;
>  import org.apache.camel.model.loadbalancer.StickyLoadBalanceStrategy;
> -import org.apache.camel.model.loadbalancer.TopicLoadBalanceStrategy;
>  import org.apache.camel.spi.NamespaceAware;
>  import org.apache.camel.spring.CamelBeanPostProcessor;
>  import org.apache.camel.spring.CamelContextFactoryBean;
> @@ -91,7 +90,6 @@
>          addBeanDefinitionParser("roundRobin", RoundRobinLoadBalanceStrategy.class);
>          addBeanDefinitionParser("random", RandomLoadBalanceStrategy.class);
>          addBeanDefinitionParser("sticky", StickyLoadBalanceStrategy.class);
> -        addBeanDefinitionParser("topic", TopicLoadBalanceStrategy.class);
>  
>          // jmx agent
>          addBeanDefinitionParser("jmxAgent", CamelJMXAgentType.class);
> 
> 
>