You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@camel.apache.org by "Noah Nordrum (JIRA)" <ji...@apache.org> on 2007/08/14 01:42:22 UTC

[jira] Created: (CAMEL-101) SquelchBuilder

SquelchBuilder
--------------

                 Key: CAMEL-101
                 URL: https://issues.apache.org/activemq/browse/CAMEL-101
             Project: Apache Camel
          Issue Type: New Feature
          Components: camel-core
            Reporter: Noah Nordrum


feel free to break out the inner class too...

builder to limit throughput on a given route.



package org.apache.camel.builder;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.util.ServiceHelper;

public class SquelchBuilder extends FromBuilder {
    private long minMessageSpacingInMs;


    public SquelchBuilder(FromBuilder parent, long minMessageSpacingInMs) {
        super(parent);
        this.minMessageSpacingInMs = minMessageSpacingInMs;
    }

    public SquelchProcessor createProcessor() throws Exception {
        // lets create a single processor for all child predicates
        final Processor childProcessor = super.createProcessor();
        return new SquelchProcessor(minMessageSpacingInMs, childProcessor);
    }

    class SquelchProcessor extends ServiceSupport implements Processor {
        private long minMessageSpacingInMs;
        private Processor processor;
        private long nextLetThroughTime;

        public SquelchProcessor(long minMessageSpacingInMs, Processor processor) {
            this.minMessageSpacingInMs = minMessageSpacingInMs;
            this.processor = processor;
        }

        public void process(Exchange exchange) throws Exception {
            final long now = System.currentTimeMillis();
            if (now < nextLetThroughTime) {
                final long sleepTime = nextLetThroughTime - now;
                Thread.sleep(sleepTime);
            }
            processor.process(exchange);
            nextLetThroughTime = System.currentTimeMillis() + minMessageSpacingInMs; // reset the now
        }

        protected void doStart() throws Exception {
            ServiceHelper.startServices(processor);
        }

        protected void doStop() throws Exception {
            ServiceHelper.stopServices(processor);
        }
    }
}




add this method in FromBuilder (might need to change the annotations, not sure how they affect things...)
    @Fluent
    public SquelchBuilder squelch(
            @FluentArg(value = "minMessageSpacingInMs", element = true)
            long minMessageSpacingInMs) {
        SquelchBuilder answer = new SquelchBuilder(this, minMessageSpacingInMs);
        addProcessBuilder(answer);
        return answer;
    }


-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (CAMEL-101) SquelchBuilder

Posted by "Hadrian Zbarcea (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/activemq/browse/CAMEL-101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hadrian Zbarcea updated CAMEL-101:
----------------------------------

    Fix Version/s: 1.2.0

> SquelchBuilder
> --------------
>
>                 Key: CAMEL-101
>                 URL: https://issues.apache.org/activemq/browse/CAMEL-101
>             Project: Apache Camel
>          Issue Type: New Feature
>          Components: camel-core
>            Reporter: Noah Nordrum
>             Fix For: 1.2.0
>
>
> feel free to break out the inner class too...
> builder to limit throughput on a given route.
> package org.apache.camel.builder;
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> import org.apache.camel.impl.ServiceSupport;
> import org.apache.camel.util.ServiceHelper;
> public class SquelchBuilder extends FromBuilder {
>     private long minMessageSpacingInMs;
>     public SquelchBuilder(FromBuilder parent, long minMessageSpacingInMs) {
>         super(parent);
>         this.minMessageSpacingInMs = minMessageSpacingInMs;
>     }
>     public SquelchProcessor createProcessor() throws Exception {
>         // lets create a single processor for all child predicates
>         final Processor childProcessor = super.createProcessor();
>         return new SquelchProcessor(minMessageSpacingInMs, childProcessor);
>     }
>     class SquelchProcessor extends ServiceSupport implements Processor {
>         private long minMessageSpacingInMs;
>         private Processor processor;
>         private long nextLetThroughTime;
>         public SquelchProcessor(long minMessageSpacingInMs, Processor processor) {
>             this.minMessageSpacingInMs = minMessageSpacingInMs;
>             this.processor = processor;
>         }
>         public void process(Exchange exchange) throws Exception {
>             final long now = System.currentTimeMillis();
>             if (now < nextLetThroughTime) {
>                 final long sleepTime = nextLetThroughTime - now;
>                 Thread.sleep(sleepTime);
>             }
>             processor.process(exchange);
>             nextLetThroughTime = System.currentTimeMillis() + minMessageSpacingInMs; // reset the now
>         }
>         protected void doStart() throws Exception {
>             ServiceHelper.startServices(processor);
>         }
>         protected void doStop() throws Exception {
>             ServiceHelper.stopServices(processor);
>         }
>     }
> }
> add this method in FromBuilder (might need to change the annotations, not sure how they affect things...)
>     @Fluent
>     public SquelchBuilder squelch(
>             @FluentArg(value = "minMessageSpacingInMs", element = true)
>             long minMessageSpacingInMs) {
>         SquelchBuilder answer = new SquelchBuilder(this, minMessageSpacingInMs);
>         addProcessBuilder(answer);
>         return answer;
>     }

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Closed: (CAMEL-101) SquelchBuilder

Posted by "Noah Nordrum (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/activemq/browse/CAMEL-101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Noah Nordrum closed CAMEL-101.
------------------------------

    Resolution: Duplicate

[3:42pm] nnordrum: hey chirino_m, does Camel have a "squaker", i.e. only let 5 messages through a second
[3:42pm] chirino_m: don't recall off the to of my head.
[3:43pm] nnordrum: should be pretty simple to do though... would you do it as a component, or a processor?
[3:44pm] chirino_m: could be either.
[3:44pm] chirino_m: but I think a component is more reasonable
[3:44pm] chirino_m: since it acts like a queue.
[3:44pm] nnordrum: that's what I was thinking...
[3:44pm] chirino_m: you might want to even extend the seda component


just discovered there's a throttler...

> SquelchBuilder
> --------------
>
>                 Key: CAMEL-101
>                 URL: https://issues.apache.org/activemq/browse/CAMEL-101
>             Project: Apache Camel
>          Issue Type: New Feature
>          Components: camel-core
>            Reporter: Noah Nordrum
>
> feel free to break out the inner class too...
> builder to limit throughput on a given route.
> package org.apache.camel.builder;
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> import org.apache.camel.impl.ServiceSupport;
> import org.apache.camel.util.ServiceHelper;
> public class SquelchBuilder extends FromBuilder {
>     private long minMessageSpacingInMs;
>     public SquelchBuilder(FromBuilder parent, long minMessageSpacingInMs) {
>         super(parent);
>         this.minMessageSpacingInMs = minMessageSpacingInMs;
>     }
>     public SquelchProcessor createProcessor() throws Exception {
>         // lets create a single processor for all child predicates
>         final Processor childProcessor = super.createProcessor();
>         return new SquelchProcessor(minMessageSpacingInMs, childProcessor);
>     }
>     class SquelchProcessor extends ServiceSupport implements Processor {
>         private long minMessageSpacingInMs;
>         private Processor processor;
>         private long nextLetThroughTime;
>         public SquelchProcessor(long minMessageSpacingInMs, Processor processor) {
>             this.minMessageSpacingInMs = minMessageSpacingInMs;
>             this.processor = processor;
>         }
>         public void process(Exchange exchange) throws Exception {
>             final long now = System.currentTimeMillis();
>             if (now < nextLetThroughTime) {
>                 final long sleepTime = nextLetThroughTime - now;
>                 Thread.sleep(sleepTime);
>             }
>             processor.process(exchange);
>             nextLetThroughTime = System.currentTimeMillis() + minMessageSpacingInMs; // reset the now
>         }
>         protected void doStart() throws Exception {
>             ServiceHelper.startServices(processor);
>         }
>         protected void doStop() throws Exception {
>             ServiceHelper.stopServices(processor);
>         }
>     }
> }
> add this method in FromBuilder (might need to change the annotations, not sure how they affect things...)
>     @Fluent
>     public SquelchBuilder squelch(
>             @FluentArg(value = "minMessageSpacingInMs", element = true)
>             long minMessageSpacingInMs) {
>         SquelchBuilder answer = new SquelchBuilder(this, minMessageSpacingInMs);
>         addProcessBuilder(answer);
>         return answer;
>     }

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.