You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@synapse.apache.org by Ruwan Linton <ru...@gmail.com> on 2009/05/12 21:13:34 UTC

Re: svn commit: r773818 - in /synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix: FIXApplicationFactory.java FIXSessionFactory.java FIXTransportListener.java FIXTransportSender.java

Hiranya,

If you can make the worker pool configurable that would be of much
importance... you may have a look at the nhttp transport thread pool, which
can be configurable via the nhttp.properties file.

Thanks,
Ruwan

On Tue, May 12, 2009 at 1:30 PM, <hi...@apache.org> wrote:

> Author: hiranya
> Date: Tue May 12 08:00:27 2009
> New Revision: 773818
>
> URL: http://svn.apache.org/viewvc?rev=773818&view=rev
> Log:
> Enhancements and code cleanup in the FIX transport:
> * FIX sender now has its own worker pool and hence does not rely on the FIX
> listener any more. Therefore listener and sender can be enabled individually
> * Made FIXSessionFactory a singleton to effectively share session data
> among the listener and the sender
> * Cleanup logic for initiators during sender shutdown
> * Minor bug fix at FIXSessionFactory for a bug which prevented the sample
> 259 and similar scenarios from operating properly
>
> Modified:
>
>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>
>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>
>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>
>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>
> Modified:
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
> URL:
> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java?rev=773818&r1=773817&r2=773818&view=diff
>
> ==============================================================================
> ---
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
> (original)
> +++
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
> Tue May 12 08:00:27 2009
> @@ -27,15 +27,12 @@
>  public class FIXApplicationFactory {
>
>     private ConfigurationContext cfgCtx;
> -    private WorkerPool workerPool;
> -
> -    public FIXApplicationFactory(ConfigurationContext cfgCtx, WorkerPool
> workerPool) {
>
> +    public FIXApplicationFactory(ConfigurationContext cfgCtx) {
>         this.cfgCtx = cfgCtx;
> -        this.workerPool = workerPool;
>     }
>
> -    public Application getFIXApplication(AxisService service, boolean
> acceptor) {
> +    public Application getFIXApplication(AxisService service, WorkerPool
> workerPool, boolean acceptor) {
>         return new FIXIncomingMessageHandler(cfgCtx, workerPool, service,
> acceptor);
>     }
>  }
> \ No newline at end of file
>
> Modified:
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
> URL:
> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java?rev=773818&r1=773817&r2=773818&view=diff
>
> ==============================================================================
> ---
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
> (original)
> +++
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
> Tue May 12 08:00:27 2009
> @@ -23,6 +23,7 @@
>  import org.apache.axis2.description.AxisService;
>  import org.apache.axis2.description.Parameter;
>  import org.apache.axis2.transport.base.BaseUtils;
> +import org.apache.axis2.transport.base.threads.WorkerPool;
>  import org.apache.commons.logging.Log;
>  import org.apache.commons.logging.LogFactory;
>  import quickfix.*;
> @@ -65,16 +66,29 @@
>     /** A Map containing all the FIX applications created for initiators,
> keyed by FIX EPR */
>     private Map<String, Application> applicationStore;
>     /** An ApplicationFactory handles creating FIX Applications
> (FIXIncomingMessageHandler Objects) */
> -    private FIXApplicationFactory applicationFactory;
> +    private static FIXApplicationFactory applicationFactory = null;
> +
> +    private WorkerPool listenerThreadPool;
> +    private WorkerPool senderThreadPool;
>
>     private Log log;
>
> -    public FIXSessionFactory(FIXApplicationFactory applicationFactory) {
> -        this.applicationFactory = applicationFactory;
> +    private static FIXSessionFactory INSTANCE = new FIXSessionFactory();
> +
> +    public static FIXSessionFactory getInstance(FIXApplicationFactory af)
> {
> +        if (applicationFactory == null) {
> +            applicationFactory = af;
> +        }
> +        return INSTANCE;
> +    }
> +
> +    private FIXSessionFactory() {
>         this.log = LogFactory.getLog(this.getClass());
>         this.acceptorStore = new HashMap<String,Acceptor>();
>         this.initiatorStore = new HashMap<String, Initiator>();
>         this.applicationStore = new HashMap<String, Application>();
> +        this.listenerThreadPool = null;
> +        this.senderThreadPool = null;
>     }
>
>     /**
> @@ -101,7 +115,7 @@
>                 MessageFactory messageFactory = new
> DefaultMessageFactory();
>                 quickfix.LogFactory logFactory = getLogFactory(service,
> settings, true);
>                 //Get a new FIX Application
> -                Application messageHandler =
> applicationFactory.getFIXApplication(service, true);
> +                Application messageHandler =
> applicationFactory.getFIXApplication(service, listenerThreadPool, true);
>                 //Create a new FIX Acceptor
>                 Acceptor acceptor = new SocketAcceptor(
>                         messageHandler,
> @@ -174,7 +188,7 @@
>         MessageStoreFactory storeFactory = getMessageStoreFactory(service,
> settings, false);
>         MessageFactory messageFactory = new DefaultMessageFactory();
>         //Get a new FIX application
> -        Application messageHandler =
> applicationFactory.getFIXApplication(service, false);
> +        Application messageHandler =
> applicationFactory.getFIXApplication(service, senderThreadPool, false);
>
>         try {
>            //Create a new FIX initiator
> @@ -216,7 +230,7 @@
>                 MessageFactory messageFactory = new
> DefaultMessageFactory();
>                 quickfix.LogFactory logFactory = getLogFactory(service,
> settings, true);
>                 //Get a new FIX Application
> -                Application messageHandler =
> applicationFactory.getFIXApplication(service, false);
> +                Application messageHandler =
> applicationFactory.getFIXApplication(service, senderThreadPool, false);
>
>                 Initiator initiator = new SocketInitiator(
>                     messageHandler,
> @@ -246,10 +260,10 @@
>             }
>
>         } else {
> -            String msg = "The " +
> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " +
> -                    "not specified. Unable to initialize the initiator
> session at this stage.";
> -            log.info(msg);
> -            throw new AxisFault(msg);
> +            // FIX initiator session is not configured
> +            // It could be intentional - So not an error (we don't need
> initiators at all times)
> +            log.info("The " + FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM
> + " parameter is " +
> +                    "not specified. Unable to initialize the initiator
> session at this stage.");
>         }
>     }
>
> @@ -276,6 +290,24 @@
>     }
>
>     /**
> +     * Stops all the FIX initiators created so far and cleans up all the
> mappings
> +     * related to them
> +     */
> +    public void disposeFIXInitiators() {
> +        boolean debugEnabled = log.isDebugEnabled();
> +
> +        for (String key : initiatorStore.keySet()) {
> +            initiatorStore.get(key).stop();
> +            if (debugEnabled) {
> +                log.debug("FIX initiator to the EPR " + key + " stopped");
> +            }
> +        }
> +
> +        initiatorStore.clear();
> +        applicationStore.clear();
> +    }
> +
> +    /**
>      * Returns an array of Strings representing EPRs for the specified
> service
>      *
>      * @param serviceName the name of the service
> @@ -444,6 +476,14 @@
>         }
>         return app;
>     }
> +
> +    public void setListenerThreadPool(WorkerPool listenerThreadPool) {
> +        this.listenerThreadPool = listenerThreadPool;
> +    }
> +
> +    public void setSenderThreadPool(WorkerPool senderThreadPool) {
> +        this.senderThreadPool = senderThreadPool;
> +    }
>  }
>
>
>
> Modified:
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
> URL:
> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java?rev=773818&r1=773817&r2=773818&view=diff
>
> ==============================================================================
> ---
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
> (original)
> +++
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
> Tue May 12 08:00:27 2009
> @@ -60,14 +60,8 @@
>                      TransportInDescription trpInDesc) throws AxisFault {
>
>         super.init(cfgCtx, trpInDesc);
> -        //initialize the FIXSessionFactory
> -        fixSessionFactory = new FIXSessionFactory(
> -                new FIXApplicationFactory(this.cfgCtx, this.workerPool));
> -        FIXTransportSender sender = (FIXTransportSender) cfgCtx.
> -
>  getAxisConfiguration().getTransportOut(FIXConstants.TRANSPORT_NAME).getSender();
> -        if (sender != null) {
> -            sender.setSessionFactory(fixSessionFactory);
> -        }
> +        fixSessionFactory = FIXSessionFactory.getInstance(new
> FIXApplicationFactory(cfgCtx));
> +        fixSessionFactory.setListenerThreadPool(this.workerPool);
>         log.info("FIX transport listener initialized...");
>     }
>
>
> Modified:
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
> URL:
> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java?rev=773818&r1=773817&r2=773818&view=diff
>
> ==============================================================================
> ---
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
> (original)
> +++
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
> Tue May 12 08:00:27 2009
> @@ -28,6 +28,8 @@
>  import org.apache.axis2.transport.OutTransportInfo;
>  import org.apache.axis2.transport.base.AbstractTransportSender;
>  import org.apache.axis2.transport.base.BaseUtils;
> +import org.apache.axis2.transport.base.threads.WorkerPool;
> +import org.apache.axis2.transport.base.threads.WorkerPoolFactory;
>  import org.apache.commons.logging.LogFactory;
>  import quickfix.*;
>  import quickfix.field.*;
> @@ -51,17 +53,12 @@
>
>     private FIXSessionFactory sessionFactory;
>     private FIXOutgoingMessageHandler messageSender;
> +    private WorkerPool workerPool;
>
>     public FIXTransportSender() {
>         this.log = LogFactory.getLog(this.getClass());
>     }
>
> -
> -    public void setSessionFactory(FIXSessionFactory sessionFactory) {
> -        this.sessionFactory = sessionFactory;
> -        this.messageSender.setSessionFactory(sessionFactory);
> -    }
> -
>     /**
>      * @param cfgCtx       the axis2 configuration context
>      * @param transportOut the Out Transport description
> @@ -69,10 +66,25 @@
>      */
>     public void init(ConfigurationContext cfgCtx, TransportOutDescription
> transportOut) throws AxisFault {
>         super.init(cfgCtx, transportOut);
> +        this.sessionFactory = FIXSessionFactory.getInstance(new
> FIXApplicationFactory(cfgCtx));
> +        this.workerPool = WorkerPoolFactory.getWorkerPool(
> +                            10, 20, 5, -1, "FIX Sender Worker thread
> group", "FIX-Worker");
> +        this.sessionFactory.setSenderThreadPool(this.workerPool);
>         messageSender = new FIXOutgoingMessageHandler();
> +        messageSender.setSessionFactory(this.sessionFactory);
>         log.info("FIX transport sender initialized...");
>     }
>
> +    public void stop() {
> +        try {
> +            this.workerPool.shutdown(10000);
> +        } catch (InterruptedException e) {
> +            log.warn("Thread interrupted while waiting for worker pool to
> shut down");
> +        }
> +        sessionFactory.disposeFIXInitiators();
> +        super.stop();
> +    }
> +
>     /**
>      * Performs the actual sending of the message.
>      *
>
>
>


-- 
Ruwan Linton
Senior Software Engineer & Product Manager; WSO2 ESB; http://wso2.org/esb
WSO2 Inc.; http://wso2.org
email: ruwan@wso2.com; cell: +94 77 341 3097
blog: http://ruwansblog.blogspot.com

Re: svn commit: r773818 - in /synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix: FIXApplicationFactory.java FIXSessionFactory.java FIXTransportListener.java FIXTransportSender.java

Posted by Ruwan Linton <ru...@gmail.com>.
Yeah, I was on the mind set that these are there as commented out parameters
in axis2.xml, that is how we have most of the parameters. But if you look at
the things that we take up from the axis2.xml I feel these to be in an
properties file also being compatible with the nhttp transport.

Thanks,
Ruwan

On Sat, May 23, 2009 at 7:01 PM, Andreas Veithen
<an...@gmail.com>wrote:

> Ruwan,
>
> I don't get your argument. Whether the properties are in axis2.xml or
> in a separate properties file, they can always be optional. Also,
> whether people think that these are frequently configurable parameters
> depends on what you write into the documentation and what you provide
> in the sample configuration files, but not on the place where these
> parameters must be specified.
>
> Andreas
>
> On Thu, May 21, 2009 at 08:46, Ruwan Linton <ru...@gmail.com>
> wrote:
> > I prefer this to be in an optional properties file, where it is not a
> must
> > to have this properties file, as in the nio http transport case.
> >
> > This is because the thread pool configuration shouldn't be a normal
> > configuration that the users are doing. If we put these configurations as
> a
> > parameter in the axis2.xml that might lead people to think that it is a
> > frequently configurable parameter.
> >
> > So I think the properties file approach is better than the parameter
> > approach.
> >
> > Thanks,
> > Ruwan
> >
> > On Wed, May 13, 2009 at 4:12 PM, Hiranya Jayathilaka <
> hiranya911@gmail.com>
> > wrote:
> >>
> >>
> >> On Wed, May 13, 2009 at 2:41 PM, Andreas Veithen
> >> <an...@gmail.com> wrote:
> >>>
> >>> We definitely need to make the WorkerPool created by
> >>> AbstractTransportListener configurable. The question whether this
> >>> should be done using a property file or using parameters in
> >>> TransportInDescription (as with all other configuration settings).
> >>> What are the arguments in favor of doing it in a separate property
> >>> file?
> >>
> >> Nothing significant I guess. It might probably save us a few bytes of
> >> memory since with a file based approach we don't really have to save
> those
> >> parameters in the configuration context. But that's not a big motivating
> >> facator IMO. May be the only plus point worth considering is that it's
> just
> >> consistent with the existing approach for configuring thread pools. We
> >> currently use nhttp.properties to configure the HTTP-NIO thread pool.
> >>
> >> But I'm ok with using transport parameters to do this.
> >>
> >> Thanks,
> >> Hiranya
> >>
> >>>
> >>>
> >>> Andreas
> >>>
> >>> On Wed, May 13, 2009 at 08:31, Hiranya Jayathilaka <
> hiranya911@gmail.com>
> >>> wrote:
> >>> >
> >>> >
> >>> > On Wed, May 13, 2009 at 10:30 AM, Ruwan Linton <
> ruwan.linton@gmail.com>
> >>> > wrote:
> >>> >>
> >>> >> I think we need to make that configurable as well.... currently hard
> >>> >> codded setting will work in 98% of the cases, but there can be a
> >>> >> scenario
> >>> >> where it requires a tune up.
> >>> >>
> >>> >> Can we do this in a manner that we can configure them per transport.
> >>> >
> >>> > One simple solution would be to read a transport specific
> configuration
> >>> > file
> >>> > at AbstractTransportListener#init(). The init method gets a
> >>> > TransportInDescription object as an argument and from that we can
> >>> > retrieve
> >>> > the transport name to construct a file name unique to a given
> transport
> >>> > (eg:
> >>> > mail.properties, vfs.properties). This approach has the benefit that
> it
> >>> > doesn't require changes to any of the actual transport
> implementations.
> >>> > Everything is taken care of by the abstract class.
> >>> >
> >>> > However this class now belongs to the WS-Commons transports project.
> So
> >>> > the
> >>> > enhancement should be made there.
> >>> >
> >>> > Thanks,
> >>> > Hiranya
> >>> >
> >>> >
> >>> >>
> >>> >> Thanks,
> >>> >> Ruwan
> >>> >>
> >>> >> On Wed, May 13, 2009 at 10:06 AM, Hiranya Jayathilaka
> >>> >> <hi...@gmail.com> wrote:
> >>> >>>
> >>> >>> Hi Ruwan,
> >>> >>>
> >>> >>> On Wed, May 13, 2009 at 12:43 AM, Ruwan Linton
> >>> >>> <ru...@gmail.com>
> >>> >>> wrote:
> >>> >>>>
> >>> >>>> Hiranya,
> >>> >>>>
> >>> >>>> If you can make the worker pool configurable that would be of much
> >>> >>>> importance... you may have a look at the nhttp transport thread
> >>> >>>> pool, which
> >>> >>>> can be configurable via the nhttp.properties file.
> >>> >>>
> >>> >>> Currently the FIX sender initializes the WorkerPool in a manner
> >>> >>> similar
> >>> >>> to the AbstractTransportListener. The WorkerPool in
> >>> >>> AbstractTransportListener is used by several transports (JMS, Mail
> >>> >>> etc) via
> >>> >>> inheritance. FIX listener also makes use of the same thread pool.
> Do
> >>> >>> we have
> >>> >>> any plans to make that thread pool configurable too? Othrewise I
> >>> >>> don't think
> >>> >>> it mkes much sense just to make the FIX sender's thread pool
> >>> >>> configurable.
> >>> >>>
> >>> >>> Thanks,
> >>> >>> Hiranya
> >>> >>>
> >>> >>>>
> >>> >>>>
> >>> >>>> Thanks,
> >>> >>>> Ruwan
> >>> >>>>
> >>> >>>> On Tue, May 12, 2009 at 1:30 PM, <hi...@apache.org> wrote:
> >>> >>>>>
> >>> >>>>> Author: hiranya
> >>> >>>>> Date: Tue May 12 08:00:27 2009
> >>> >>>>> New Revision: 773818
> >>> >>>>>
> >>> >>>>> URL: http://svn.apache.org/viewvc?rev=773818&view=rev
> >>> >>>>> Log:
> >>> >>>>> Enhancements and code cleanup in the FIX transport:
> >>> >>>>> * FIX sender now has its own worker pool and hence does not rely
> on
> >>> >>>>> the
> >>> >>>>> FIX listener any more. Therefore listener and sender can be
> enabled
> >>> >>>>> individually
> >>> >>>>> * Made FIXSessionFactory a singleton to effectively share session
> >>> >>>>> data
> >>> >>>>> among the listener and the sender
> >>> >>>>> * Cleanup logic for initiators during sender shutdown
> >>> >>>>> * Minor bug fix at FIXSessionFactory for a bug which prevented
> the
> >>> >>>>> sample 259 and similar scenarios from operating properly
> >>> >>>>>
> >>> >>>>> Modified:
> >>> >>>>>
> >>> >>>>>
> >>> >>>>>
>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
> >>> >>>>>
> >>> >>>>>
> >>> >>>>>
>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
> >>> >>>>>
> >>> >>>>>
> >>> >>>>>
>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
> >>> >>>>>
> >>> >>>>>
> >>> >>>>>
>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
> >>> >>>>>
> >>> >>>>> Modified:
> >>> >>>>>
> >>> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
> >>> >>>>> URL:
> >>> >>>>>
> >>> >>>>>
> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java?rev=773818&r1=773817&r2=773818&view=diff
> >>> >>>>>
> >>> >>>>>
> >>> >>>>>
> ==============================================================================
> >>> >>>>> ---
> >>> >>>>>
> >>> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
> >>> >>>>> (original)
> >>> >>>>> +++
> >>> >>>>>
> >>> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
> >>> >>>>> Tue May 12 08:00:27 2009
> >>> >>>>> @@ -27,15 +27,12 @@
> >>> >>>>>  public class FIXApplicationFactory {
> >>> >>>>>
> >>> >>>>>     private ConfigurationContext cfgCtx;
> >>> >>>>> -    private WorkerPool workerPool;
> >>> >>>>> -
> >>> >>>>> -    public FIXApplicationFactory(ConfigurationContext cfgCtx,
> >>> >>>>> WorkerPool workerPool) {
> >>> >>>>>
> >>> >>>>> +    public FIXApplicationFactory(ConfigurationContext cfgCtx) {
> >>> >>>>>         this.cfgCtx = cfgCtx;
> >>> >>>>> -        this.workerPool = workerPool;
> >>> >>>>>     }
> >>> >>>>>
> >>> >>>>> -    public Application getFIXApplication(AxisService service,
> >>> >>>>> boolean
> >>> >>>>> acceptor) {
> >>> >>>>> +    public Application getFIXApplication(AxisService service,
> >>> >>>>> WorkerPool workerPool, boolean acceptor) {
> >>> >>>>>         return new FIXIncomingMessageHandler(cfgCtx, workerPool,
> >>> >>>>> service, acceptor);
> >>> >>>>>     }
> >>> >>>>>  }
> >>> >>>>> \ No newline at end of file
> >>> >>>>>
> >>> >>>>> Modified:
> >>> >>>>>
> >>> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
> >>> >>>>> URL:
> >>> >>>>>
> >>> >>>>>
> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java?rev=773818&r1=773817&r2=773818&view=diff
> >>> >>>>>
> >>> >>>>>
> >>> >>>>>
> ==============================================================================
> >>> >>>>> ---
> >>> >>>>>
> >>> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
> >>> >>>>> (original)
> >>> >>>>> +++
> >>> >>>>>
> >>> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
> >>> >>>>> Tue May 12 08:00:27 2009
> >>> >>>>> @@ -23,6 +23,7 @@
> >>> >>>>>  import org.apache.axis2.description.AxisService;
> >>> >>>>>  import org.apache.axis2.description.Parameter;
> >>> >>>>>  import org.apache.axis2.transport.base.BaseUtils;
> >>> >>>>> +import org.apache.axis2.transport.base.threads.WorkerPool;
> >>> >>>>>  import org.apache.commons.logging.Log;
> >>> >>>>>  import org.apache.commons.logging.LogFactory;
> >>> >>>>>  import quickfix.*;
> >>> >>>>> @@ -65,16 +66,29 @@
> >>> >>>>>     /** A Map containing all the FIX applications created for
> >>> >>>>> initiators, keyed by FIX EPR */
> >>> >>>>>     private Map<String, Application> applicationStore;
> >>> >>>>>     /** An ApplicationFactory handles creating FIX Applications
> >>> >>>>> (FIXIncomingMessageHandler Objects) */
> >>> >>>>> -    private FIXApplicationFactory applicationFactory;
> >>> >>>>> +    private static FIXApplicationFactory applicationFactory =
> >>> >>>>> null;
> >>> >>>>> +
> >>> >>>>> +    private WorkerPool listenerThreadPool;
> >>> >>>>> +    private WorkerPool senderThreadPool;
> >>> >>>>>
> >>> >>>>>     private Log log;
> >>> >>>>>
> >>> >>>>> -    public FIXSessionFactory(FIXApplicationFactory
> >>> >>>>> applicationFactory)
> >>> >>>>> {
> >>> >>>>> -        this.applicationFactory = applicationFactory;
> >>> >>>>> +    private static FIXSessionFactory INSTANCE = new
> >>> >>>>> FIXSessionFactory();
> >>> >>>>> +
> >>> >>>>> +    public static FIXSessionFactory
> >>> >>>>> getInstance(FIXApplicationFactory
> >>> >>>>> af) {
> >>> >>>>> +        if (applicationFactory == null) {
> >>> >>>>> +            applicationFactory = af;
> >>> >>>>> +        }
> >>> >>>>> +        return INSTANCE;
> >>> >>>>> +    }
> >>> >>>>> +
> >>> >>>>> +    private FIXSessionFactory() {
> >>> >>>>>         this.log = LogFactory.getLog(this.getClass());
> >>> >>>>>         this.acceptorStore = new HashMap<String,Acceptor>();
> >>> >>>>>         this.initiatorStore = new HashMap<String, Initiator>();
> >>> >>>>>         this.applicationStore = new HashMap<String,
> Application>();
> >>> >>>>> +        this.listenerThreadPool = null;
> >>> >>>>> +        this.senderThreadPool = null;
> >>> >>>>>     }
> >>> >>>>>
> >>> >>>>>     /**
> >>> >>>>> @@ -101,7 +115,7 @@
> >>> >>>>>                 MessageFactory messageFactory = new
> >>> >>>>> DefaultMessageFactory();
> >>> >>>>>                 quickfix.LogFactory logFactory =
> >>> >>>>> getLogFactory(service,
> >>> >>>>> settings, true);
> >>> >>>>>                 //Get a new FIX Application
> >>> >>>>> -                Application messageHandler =
> >>> >>>>> applicationFactory.getFIXApplication(service, true);
> >>> >>>>> +                Application messageHandler =
> >>> >>>>> applicationFactory.getFIXApplication(service, listenerThreadPool,
> >>> >>>>> true);
> >>> >>>>>                 //Create a new FIX Acceptor
> >>> >>>>>                 Acceptor acceptor = new SocketAcceptor(
> >>> >>>>>                         messageHandler,
> >>> >>>>> @@ -174,7 +188,7 @@
> >>> >>>>>         MessageStoreFactory storeFactory =
> >>> >>>>> getMessageStoreFactory(service, settings, false);
> >>> >>>>>         MessageFactory messageFactory = new
> >>> >>>>> DefaultMessageFactory();
> >>> >>>>>         //Get a new FIX application
> >>> >>>>> -        Application messageHandler =
> >>> >>>>> applicationFactory.getFIXApplication(service, false);
> >>> >>>>> +        Application messageHandler =
> >>> >>>>> applicationFactory.getFIXApplication(service, senderThreadPool,
> >>> >>>>> false);
> >>> >>>>>
> >>> >>>>>         try {
> >>> >>>>>            //Create a new FIX initiator
> >>> >>>>> @@ -216,7 +230,7 @@
> >>> >>>>>                 MessageFactory messageFactory = new
> >>> >>>>> DefaultMessageFactory();
> >>> >>>>>                 quickfix.LogFactory logFactory =
> >>> >>>>> getLogFactory(service,
> >>> >>>>> settings, true);
> >>> >>>>>                 //Get a new FIX Application
> >>> >>>>> -                Application messageHandler =
> >>> >>>>> applicationFactory.getFIXApplication(service, false);
> >>> >>>>> +                Application messageHandler =
> >>> >>>>> applicationFactory.getFIXApplication(service, senderThreadPool,
> >>> >>>>> false);
> >>> >>>>>
> >>> >>>>>                 Initiator initiator = new SocketInitiator(
> >>> >>>>>                     messageHandler,
> >>> >>>>> @@ -246,10 +260,10 @@
> >>> >>>>>             }
> >>> >>>>>
> >>> >>>>>         } else {
> >>> >>>>> -            String msg = "The " +
> >>> >>>>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " +
> >>> >>>>> -                    "not specified. Unable to initialize the
> >>> >>>>> initiator
> >>> >>>>> session at this stage.";
> >>> >>>>> -            log.info(msg);
> >>> >>>>> -            throw new AxisFault(msg);
> >>> >>>>> +            // FIX initiator session is not configured
> >>> >>>>> +            // It could be intentional - So not an error (we
> don't
> >>> >>>>> need initiators at all times)
> >>> >>>>> +            log.info("The " +
> >>> >>>>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " +
> >>> >>>>> +                    "not specified. Unable to initialize the
> >>> >>>>> initiator
> >>> >>>>> session at this stage.");
> >>> >>>>>         }
> >>> >>>>>     }
> >>> >>>>>
> >>> >>>>> @@ -276,6 +290,24 @@
> >>> >>>>>     }
> >>> >>>>>
> >>> >>>>>     /**
> >>> >>>>> +     * Stops all the FIX initiators created so far and cleans up
> >>> >>>>> all
> >>> >>>>> the mappings
> >>> >>>>> +     * related to them
> >>> >>>>> +     */
> >>> >>>>> +    public void disposeFIXInitiators() {
> >>> >>>>> +        boolean debugEnabled = log.isDebugEnabled();
> >>> >>>>> +
> >>> >>>>> +        for (String key : initiatorStore.keySet()) {
> >>> >>>>> +            initiatorStore.get(key).stop();
> >>> >>>>> +            if (debugEnabled) {
> >>> >>>>> +                log.debug("FIX initiator to the EPR " + key + "
> >>> >>>>> stopped");
> >>> >>>>> +            }
> >>> >>>>> +        }
> >>> >>>>> +
> >>> >>>>> +        initiatorStore.clear();
> >>> >>>>> +        applicationStore.clear();
> >>> >>>>> +    }
> >>> >>>>> +
> >>> >>>>> +    /**
> >>> >>>>>      * Returns an array of Strings representing EPRs for the
> >>> >>>>> specified
> >>> >>>>> service
> >>> >>>>>      *
> >>> >>>>>      * @param serviceName the name of the service
> >>> >>>>> @@ -444,6 +476,14 @@
> >>> >>>>>         }
> >>> >>>>>         return app;
> >>> >>>>>     }
> >>> >>>>> +
> >>> >>>>> +    public void setListenerThreadPool(WorkerPool
> >>> >>>>> listenerThreadPool) {
> >>> >>>>> +        this.listenerThreadPool = listenerThreadPool;
> >>> >>>>> +    }
> >>> >>>>> +
> >>> >>>>> +    public void setSenderThreadPool(WorkerPool senderThreadPool)
> {
> >>> >>>>> +        this.senderThreadPool = senderThreadPool;
> >>> >>>>> +    }
> >>> >>>>>  }
> >>> >>>>>
> >>> >>>>>
> >>> >>>>>
> >>> >>>>> Modified:
> >>> >>>>>
> >>> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
> >>> >>>>> URL:
> >>> >>>>>
> >>> >>>>>
> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java?rev=773818&r1=773817&r2=773818&view=diff
> >>> >>>>>
> >>> >>>>>
> >>> >>>>>
> ==============================================================================
> >>> >>>>> ---
> >>> >>>>>
> >>> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
> >>> >>>>> (original)
> >>> >>>>> +++
> >>> >>>>>
> >>> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
> >>> >>>>> Tue May 12 08:00:27 2009
> >>> >>>>> @@ -60,14 +60,8 @@
> >>> >>>>>                      TransportInDescription trpInDesc) throws
> >>> >>>>> AxisFault
> >>> >>>>> {
> >>> >>>>>
> >>> >>>>>         super.init(cfgCtx, trpInDesc);
> >>> >>>>> -        //initialize the FIXSessionFactory
> >>> >>>>> -        fixSessionFactory = new FIXSessionFactory(
> >>> >>>>> -                new FIXApplicationFactory(this.cfgCtx,
> >>> >>>>> this.workerPool));
> >>> >>>>> -        FIXTransportSender sender = (FIXTransportSender) cfgCtx.
> >>> >>>>> -
> >>> >>>>>
> >>> >>>>>
>  getAxisConfiguration().getTransportOut(FIXConstants.TRANSPORT_NAME).getSender();
> >>> >>>>> -        if (sender != null) {
> >>> >>>>> -            sender.setSessionFactory(fixSessionFactory);
> >>> >>>>> -        }
> >>> >>>>> +        fixSessionFactory = FIXSessionFactory.getInstance(new
> >>> >>>>> FIXApplicationFactory(cfgCtx));
> >>> >>>>> +
>  fixSessionFactory.setListenerThreadPool(this.workerPool);
> >>> >>>>>         log.info("FIX transport listener initialized...");
> >>> >>>>>     }
> >>> >>>>>
> >>> >>>>>
> >>> >>>>> Modified:
> >>> >>>>>
> >>> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
> >>> >>>>> URL:
> >>> >>>>>
> >>> >>>>>
> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java?rev=773818&r1=773817&r2=773818&view=diff
> >>> >>>>>
> >>> >>>>>
> >>> >>>>>
> ==============================================================================
> >>> >>>>> ---
> >>> >>>>>
> >>> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
> >>> >>>>> (original)
> >>> >>>>> +++
> >>> >>>>>
> >>> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
> >>> >>>>> Tue May 12 08:00:27 2009
> >>> >>>>> @@ -28,6 +28,8 @@
> >>> >>>>>  import org.apache.axis2.transport.OutTransportInfo;
> >>> >>>>>  import org.apache.axis2.transport.base.AbstractTransportSender;
> >>> >>>>>  import org.apache.axis2.transport.base.BaseUtils;
> >>> >>>>> +import org.apache.axis2.transport.base.threads.WorkerPool;
> >>> >>>>> +import
> org.apache.axis2.transport.base.threads.WorkerPoolFactory;
> >>> >>>>>  import org.apache.commons.logging.LogFactory;
> >>> >>>>>  import quickfix.*;
> >>> >>>>>  import quickfix.field.*;
> >>> >>>>> @@ -51,17 +53,12 @@
> >>> >>>>>
> >>> >>>>>     private FIXSessionFactory sessionFactory;
> >>> >>>>>     private FIXOutgoingMessageHandler messageSender;
> >>> >>>>> +    private WorkerPool workerPool;
> >>> >>>>>
> >>> >>>>>     public FIXTransportSender() {
> >>> >>>>>         this.log = LogFactory.getLog(this.getClass());
> >>> >>>>>     }
> >>> >>>>>
> >>> >>>>> -
> >>> >>>>> -    public void setSessionFactory(FIXSessionFactory
> >>> >>>>> sessionFactory) {
> >>> >>>>> -        this.sessionFactory = sessionFactory;
> >>> >>>>> -        this.messageSender.setSessionFactory(sessionFactory);
> >>> >>>>> -    }
> >>> >>>>> -
> >>> >>>>>     /**
> >>> >>>>>      * @param cfgCtx       the axis2 configuration context
> >>> >>>>>      * @param transportOut the Out Transport description
> >>> >>>>> @@ -69,10 +66,25 @@
> >>> >>>>>      */
> >>> >>>>>     public void init(ConfigurationContext cfgCtx,
> >>> >>>>> TransportOutDescription transportOut) throws AxisFault {
> >>> >>>>>         super.init(cfgCtx, transportOut);
> >>> >>>>> +        this.sessionFactory = FIXSessionFactory.getInstance(new
> >>> >>>>> FIXApplicationFactory(cfgCtx));
> >>> >>>>> +        this.workerPool = WorkerPoolFactory.getWorkerPool(
> >>> >>>>> +                            10, 20, 5, -1, "FIX Sender Worker
> >>> >>>>> thread
> >>> >>>>> group", "FIX-Worker");
> >>> >>>>> +
>  this.sessionFactory.setSenderThreadPool(this.workerPool);
> >>> >>>>>         messageSender = new FIXOutgoingMessageHandler();
> >>> >>>>> +        messageSender.setSessionFactory(this.sessionFactory);
> >>> >>>>>         log.info("FIX transport sender initialized...");
> >>> >>>>>     }
> >>> >>>>>
> >>> >>>>> +    public void stop() {
> >>> >>>>> +        try {
> >>> >>>>> +            this.workerPool.shutdown(10000);
> >>> >>>>> +        } catch (InterruptedException e) {
> >>> >>>>> +            log.warn("Thread interrupted while waiting for
> worker
> >>> >>>>> pool
> >>> >>>>> to shut down");
> >>> >>>>> +        }
> >>> >>>>> +        sessionFactory.disposeFIXInitiators();
> >>> >>>>> +        super.stop();
> >>> >>>>> +    }
> >>> >>>>> +
> >>> >>>>>     /**
> >>> >>>>>      * Performs the actual sending of the message.
> >>> >>>>>      *
> >>> >>>>>
> >>> >>>>>
> >>> >>>>
> >>> >>>>
> >>> >>>>
> >>> >>>> --
> >>> >>>> Ruwan Linton
> >>> >>>> Senior Software Engineer & Product Manager; WSO2 ESB;
> >>> >>>> http://wso2.org/esb
> >>> >>>> WSO2 Inc.; http://wso2.org
> >>> >>>> email: ruwan@wso2.com; cell: +94 77 341 3097
> >>> >>>> blog: http://ruwansblog.blogspot.com
> >>> >>>
> >>> >>>
> >>> >>>
> >>> >>> --
> >>> >>> Hiranya Jayathilaka
> >>> >>> Software Engineer;
> >>> >>> WSO2 Inc.;  http://wso2.org
> >>> >>> E-mail: hiranya@wso2.com;  Mobile: +94 77 633 3491
> >>> >>> Blog: http://techfeast-hiranya.blogspot.com
> >>> >>
> >>> >>
> >>> >>
> >>> >> --
> >>> >> Ruwan Linton
> >>> >> Senior Software Engineer & Product Manager; WSO2 ESB;
> >>> >> http://wso2.org/esb
> >>> >> WSO2 Inc.; http://wso2.org
> >>> >> email: ruwan@wso2.com; cell: +94 77 341 3097
> >>> >> blog: http://ruwansblog.blogspot.com
> >>> >
> >>> >
> >>> >
> >>> > --
> >>> > Hiranya Jayathilaka
> >>> > Software Engineer;
> >>> > WSO2 Inc.;  http://wso2.org
> >>> > E-mail: hiranya@wso2.com;  Mobile: +94 77 633 3491
> >>> > Blog: http://techfeast-hiranya.blogspot.com
> >>> >
> >>>
> >>> ---------------------------------------------------------------------
> >>> To unsubscribe, e-mail: dev-unsubscribe@synapse.apache.org
> >>> For additional commands, e-mail: dev-help@synapse.apache.org
> >>>
> >>
> >>
> >>
> >> --
> >> Hiranya Jayathilaka
> >> Software Engineer;
> >> WSO2 Inc.;  http://wso2.org
> >> E-mail: hiranya@wso2.com;  Mobile: +94 77 633 3491
> >> Blog: http://techfeast-hiranya.blogspot.com
> >
> >
> >
> > --
> > Ruwan Linton
> > Senior Software Engineer & Product Manager; WSO2 ESB;
> http://wso2.org/esb
> > WSO2 Inc.; http://wso2.org
> > email: ruwan@wso2.com; cell: +94 77 341 3097
> > blog: http://ruwansblog.blogspot.com
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@synapse.apache.org
> For additional commands, e-mail: dev-help@synapse.apache.org
>
>


-- 
Ruwan Linton
Senior Software Engineer & Product Manager; WSO2 ESB; http://wso2.org/esb
WSO2 Inc.; http://wso2.org
email: ruwan@wso2.com; cell: +94 77 341 3097
blog: http://ruwansblog.blogspot.com

Re: svn commit: r773818 - in /synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix: FIXApplicationFactory.java FIXSessionFactory.java FIXTransportListener.java FIXTransportSender.java

Posted by Andreas Veithen <an...@gmail.com>.
Ruwan,

I don't get your argument. Whether the properties are in axis2.xml or
in a separate properties file, they can always be optional. Also,
whether people think that these are frequently configurable parameters
depends on what you write into the documentation and what you provide
in the sample configuration files, but not on the place where these
parameters must be specified.

Andreas

On Thu, May 21, 2009 at 08:46, Ruwan Linton <ru...@gmail.com> wrote:
> I prefer this to be in an optional properties file, where it is not a must
> to have this properties file, as in the nio http transport case.
>
> This is because the thread pool configuration shouldn't be a normal
> configuration that the users are doing. If we put these configurations as a
> parameter in the axis2.xml that might lead people to think that it is a
> frequently configurable parameter.
>
> So I think the properties file approach is better than the parameter
> approach.
>
> Thanks,
> Ruwan
>
> On Wed, May 13, 2009 at 4:12 PM, Hiranya Jayathilaka <hi...@gmail.com>
> wrote:
>>
>>
>> On Wed, May 13, 2009 at 2:41 PM, Andreas Veithen
>> <an...@gmail.com> wrote:
>>>
>>> We definitely need to make the WorkerPool created by
>>> AbstractTransportListener configurable. The question whether this
>>> should be done using a property file or using parameters in
>>> TransportInDescription (as with all other configuration settings).
>>> What are the arguments in favor of doing it in a separate property
>>> file?
>>
>> Nothing significant I guess. It might probably save us a few bytes of
>> memory since with a file based approach we don't really have to save those
>> parameters in the configuration context. But that's not a big motivating
>> facator IMO. May be the only plus point worth considering is that it's just
>> consistent with the existing approach for configuring thread pools. We
>> currently use nhttp.properties to configure the HTTP-NIO thread pool.
>>
>> But I'm ok with using transport parameters to do this.
>>
>> Thanks,
>> Hiranya
>>
>>>
>>>
>>> Andreas
>>>
>>> On Wed, May 13, 2009 at 08:31, Hiranya Jayathilaka <hi...@gmail.com>
>>> wrote:
>>> >
>>> >
>>> > On Wed, May 13, 2009 at 10:30 AM, Ruwan Linton <ru...@gmail.com>
>>> > wrote:
>>> >>
>>> >> I think we need to make that configurable as well.... currently hard
>>> >> codded setting will work in 98% of the cases, but there can be a
>>> >> scenario
>>> >> where it requires a tune up.
>>> >>
>>> >> Can we do this in a manner that we can configure them per transport.
>>> >
>>> > One simple solution would be to read a transport specific configuration
>>> > file
>>> > at AbstractTransportListener#init(). The init method gets a
>>> > TransportInDescription object as an argument and from that we can
>>> > retrieve
>>> > the transport name to construct a file name unique to a given transport
>>> > (eg:
>>> > mail.properties, vfs.properties). This approach has the benefit that it
>>> > doesn't require changes to any of the actual transport implementations.
>>> > Everything is taken care of by the abstract class.
>>> >
>>> > However this class now belongs to the WS-Commons transports project. So
>>> > the
>>> > enhancement should be made there.
>>> >
>>> > Thanks,
>>> > Hiranya
>>> >
>>> >
>>> >>
>>> >> Thanks,
>>> >> Ruwan
>>> >>
>>> >> On Wed, May 13, 2009 at 10:06 AM, Hiranya Jayathilaka
>>> >> <hi...@gmail.com> wrote:
>>> >>>
>>> >>> Hi Ruwan,
>>> >>>
>>> >>> On Wed, May 13, 2009 at 12:43 AM, Ruwan Linton
>>> >>> <ru...@gmail.com>
>>> >>> wrote:
>>> >>>>
>>> >>>> Hiranya,
>>> >>>>
>>> >>>> If you can make the worker pool configurable that would be of much
>>> >>>> importance... you may have a look at the nhttp transport thread
>>> >>>> pool, which
>>> >>>> can be configurable via the nhttp.properties file.
>>> >>>
>>> >>> Currently the FIX sender initializes the WorkerPool in a manner
>>> >>> similar
>>> >>> to the AbstractTransportListener. The WorkerPool in
>>> >>> AbstractTransportListener is used by several transports (JMS, Mail
>>> >>> etc) via
>>> >>> inheritance. FIX listener also makes use of the same thread pool. Do
>>> >>> we have
>>> >>> any plans to make that thread pool configurable too? Othrewise I
>>> >>> don't think
>>> >>> it mkes much sense just to make the FIX sender's thread pool
>>> >>> configurable.
>>> >>>
>>> >>> Thanks,
>>> >>> Hiranya
>>> >>>
>>> >>>>
>>> >>>>
>>> >>>> Thanks,
>>> >>>> Ruwan
>>> >>>>
>>> >>>> On Tue, May 12, 2009 at 1:30 PM, <hi...@apache.org> wrote:
>>> >>>>>
>>> >>>>> Author: hiranya
>>> >>>>> Date: Tue May 12 08:00:27 2009
>>> >>>>> New Revision: 773818
>>> >>>>>
>>> >>>>> URL: http://svn.apache.org/viewvc?rev=773818&view=rev
>>> >>>>> Log:
>>> >>>>> Enhancements and code cleanup in the FIX transport:
>>> >>>>> * FIX sender now has its own worker pool and hence does not rely on
>>> >>>>> the
>>> >>>>> FIX listener any more. Therefore listener and sender can be enabled
>>> >>>>> individually
>>> >>>>> * Made FIXSessionFactory a singleton to effectively share session
>>> >>>>> data
>>> >>>>> among the listener and the sender
>>> >>>>> * Cleanup logic for initiators during sender shutdown
>>> >>>>> * Minor bug fix at FIXSessionFactory for a bug which prevented the
>>> >>>>> sample 259 and similar scenarios from operating properly
>>> >>>>>
>>> >>>>> Modified:
>>> >>>>>
>>> >>>>>
>>> >>>>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>>> >>>>>
>>> >>>>>
>>> >>>>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>>> >>>>>
>>> >>>>>
>>> >>>>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>>> >>>>>
>>> >>>>>
>>> >>>>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>>> >>>>>
>>> >>>>> Modified:
>>> >>>>>
>>> >>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>>> >>>>> URL:
>>> >>>>>
>>> >>>>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java?rev=773818&r1=773817&r2=773818&view=diff
>>> >>>>>
>>> >>>>>
>>> >>>>> ==============================================================================
>>> >>>>> ---
>>> >>>>>
>>> >>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>>> >>>>> (original)
>>> >>>>> +++
>>> >>>>>
>>> >>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>>> >>>>> Tue May 12 08:00:27 2009
>>> >>>>> @@ -27,15 +27,12 @@
>>> >>>>>  public class FIXApplicationFactory {
>>> >>>>>
>>> >>>>>     private ConfigurationContext cfgCtx;
>>> >>>>> -    private WorkerPool workerPool;
>>> >>>>> -
>>> >>>>> -    public FIXApplicationFactory(ConfigurationContext cfgCtx,
>>> >>>>> WorkerPool workerPool) {
>>> >>>>>
>>> >>>>> +    public FIXApplicationFactory(ConfigurationContext cfgCtx) {
>>> >>>>>         this.cfgCtx = cfgCtx;
>>> >>>>> -        this.workerPool = workerPool;
>>> >>>>>     }
>>> >>>>>
>>> >>>>> -    public Application getFIXApplication(AxisService service,
>>> >>>>> boolean
>>> >>>>> acceptor) {
>>> >>>>> +    public Application getFIXApplication(AxisService service,
>>> >>>>> WorkerPool workerPool, boolean acceptor) {
>>> >>>>>         return new FIXIncomingMessageHandler(cfgCtx, workerPool,
>>> >>>>> service, acceptor);
>>> >>>>>     }
>>> >>>>>  }
>>> >>>>> \ No newline at end of file
>>> >>>>>
>>> >>>>> Modified:
>>> >>>>>
>>> >>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>>> >>>>> URL:
>>> >>>>>
>>> >>>>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java?rev=773818&r1=773817&r2=773818&view=diff
>>> >>>>>
>>> >>>>>
>>> >>>>> ==============================================================================
>>> >>>>> ---
>>> >>>>>
>>> >>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>>> >>>>> (original)
>>> >>>>> +++
>>> >>>>>
>>> >>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>>> >>>>> Tue May 12 08:00:27 2009
>>> >>>>> @@ -23,6 +23,7 @@
>>> >>>>>  import org.apache.axis2.description.AxisService;
>>> >>>>>  import org.apache.axis2.description.Parameter;
>>> >>>>>  import org.apache.axis2.transport.base.BaseUtils;
>>> >>>>> +import org.apache.axis2.transport.base.threads.WorkerPool;
>>> >>>>>  import org.apache.commons.logging.Log;
>>> >>>>>  import org.apache.commons.logging.LogFactory;
>>> >>>>>  import quickfix.*;
>>> >>>>> @@ -65,16 +66,29 @@
>>> >>>>>     /** A Map containing all the FIX applications created for
>>> >>>>> initiators, keyed by FIX EPR */
>>> >>>>>     private Map<String, Application> applicationStore;
>>> >>>>>     /** An ApplicationFactory handles creating FIX Applications
>>> >>>>> (FIXIncomingMessageHandler Objects) */
>>> >>>>> -    private FIXApplicationFactory applicationFactory;
>>> >>>>> +    private static FIXApplicationFactory applicationFactory =
>>> >>>>> null;
>>> >>>>> +
>>> >>>>> +    private WorkerPool listenerThreadPool;
>>> >>>>> +    private WorkerPool senderThreadPool;
>>> >>>>>
>>> >>>>>     private Log log;
>>> >>>>>
>>> >>>>> -    public FIXSessionFactory(FIXApplicationFactory
>>> >>>>> applicationFactory)
>>> >>>>> {
>>> >>>>> -        this.applicationFactory = applicationFactory;
>>> >>>>> +    private static FIXSessionFactory INSTANCE = new
>>> >>>>> FIXSessionFactory();
>>> >>>>> +
>>> >>>>> +    public static FIXSessionFactory
>>> >>>>> getInstance(FIXApplicationFactory
>>> >>>>> af) {
>>> >>>>> +        if (applicationFactory == null) {
>>> >>>>> +            applicationFactory = af;
>>> >>>>> +        }
>>> >>>>> +        return INSTANCE;
>>> >>>>> +    }
>>> >>>>> +
>>> >>>>> +    private FIXSessionFactory() {
>>> >>>>>         this.log = LogFactory.getLog(this.getClass());
>>> >>>>>         this.acceptorStore = new HashMap<String,Acceptor>();
>>> >>>>>         this.initiatorStore = new HashMap<String, Initiator>();
>>> >>>>>         this.applicationStore = new HashMap<String, Application>();
>>> >>>>> +        this.listenerThreadPool = null;
>>> >>>>> +        this.senderThreadPool = null;
>>> >>>>>     }
>>> >>>>>
>>> >>>>>     /**
>>> >>>>> @@ -101,7 +115,7 @@
>>> >>>>>                 MessageFactory messageFactory = new
>>> >>>>> DefaultMessageFactory();
>>> >>>>>                 quickfix.LogFactory logFactory =
>>> >>>>> getLogFactory(service,
>>> >>>>> settings, true);
>>> >>>>>                 //Get a new FIX Application
>>> >>>>> -                Application messageHandler =
>>> >>>>> applicationFactory.getFIXApplication(service, true);
>>> >>>>> +                Application messageHandler =
>>> >>>>> applicationFactory.getFIXApplication(service, listenerThreadPool,
>>> >>>>> true);
>>> >>>>>                 //Create a new FIX Acceptor
>>> >>>>>                 Acceptor acceptor = new SocketAcceptor(
>>> >>>>>                         messageHandler,
>>> >>>>> @@ -174,7 +188,7 @@
>>> >>>>>         MessageStoreFactory storeFactory =
>>> >>>>> getMessageStoreFactory(service, settings, false);
>>> >>>>>         MessageFactory messageFactory = new
>>> >>>>> DefaultMessageFactory();
>>> >>>>>         //Get a new FIX application
>>> >>>>> -        Application messageHandler =
>>> >>>>> applicationFactory.getFIXApplication(service, false);
>>> >>>>> +        Application messageHandler =
>>> >>>>> applicationFactory.getFIXApplication(service, senderThreadPool,
>>> >>>>> false);
>>> >>>>>
>>> >>>>>         try {
>>> >>>>>            //Create a new FIX initiator
>>> >>>>> @@ -216,7 +230,7 @@
>>> >>>>>                 MessageFactory messageFactory = new
>>> >>>>> DefaultMessageFactory();
>>> >>>>>                 quickfix.LogFactory logFactory =
>>> >>>>> getLogFactory(service,
>>> >>>>> settings, true);
>>> >>>>>                 //Get a new FIX Application
>>> >>>>> -                Application messageHandler =
>>> >>>>> applicationFactory.getFIXApplication(service, false);
>>> >>>>> +                Application messageHandler =
>>> >>>>> applicationFactory.getFIXApplication(service, senderThreadPool,
>>> >>>>> false);
>>> >>>>>
>>> >>>>>                 Initiator initiator = new SocketInitiator(
>>> >>>>>                     messageHandler,
>>> >>>>> @@ -246,10 +260,10 @@
>>> >>>>>             }
>>> >>>>>
>>> >>>>>         } else {
>>> >>>>> -            String msg = "The " +
>>> >>>>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " +
>>> >>>>> -                    "not specified. Unable to initialize the
>>> >>>>> initiator
>>> >>>>> session at this stage.";
>>> >>>>> -            log.info(msg);
>>> >>>>> -            throw new AxisFault(msg);
>>> >>>>> +            // FIX initiator session is not configured
>>> >>>>> +            // It could be intentional - So not an error (we don't
>>> >>>>> need initiators at all times)
>>> >>>>> +            log.info("The " +
>>> >>>>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " +
>>> >>>>> +                    "not specified. Unable to initialize the
>>> >>>>> initiator
>>> >>>>> session at this stage.");
>>> >>>>>         }
>>> >>>>>     }
>>> >>>>>
>>> >>>>> @@ -276,6 +290,24 @@
>>> >>>>>     }
>>> >>>>>
>>> >>>>>     /**
>>> >>>>> +     * Stops all the FIX initiators created so far and cleans up
>>> >>>>> all
>>> >>>>> the mappings
>>> >>>>> +     * related to them
>>> >>>>> +     */
>>> >>>>> +    public void disposeFIXInitiators() {
>>> >>>>> +        boolean debugEnabled = log.isDebugEnabled();
>>> >>>>> +
>>> >>>>> +        for (String key : initiatorStore.keySet()) {
>>> >>>>> +            initiatorStore.get(key).stop();
>>> >>>>> +            if (debugEnabled) {
>>> >>>>> +                log.debug("FIX initiator to the EPR " + key + "
>>> >>>>> stopped");
>>> >>>>> +            }
>>> >>>>> +        }
>>> >>>>> +
>>> >>>>> +        initiatorStore.clear();
>>> >>>>> +        applicationStore.clear();
>>> >>>>> +    }
>>> >>>>> +
>>> >>>>> +    /**
>>> >>>>>      * Returns an array of Strings representing EPRs for the
>>> >>>>> specified
>>> >>>>> service
>>> >>>>>      *
>>> >>>>>      * @param serviceName the name of the service
>>> >>>>> @@ -444,6 +476,14 @@
>>> >>>>>         }
>>> >>>>>         return app;
>>> >>>>>     }
>>> >>>>> +
>>> >>>>> +    public void setListenerThreadPool(WorkerPool
>>> >>>>> listenerThreadPool) {
>>> >>>>> +        this.listenerThreadPool = listenerThreadPool;
>>> >>>>> +    }
>>> >>>>> +
>>> >>>>> +    public void setSenderThreadPool(WorkerPool senderThreadPool) {
>>> >>>>> +        this.senderThreadPool = senderThreadPool;
>>> >>>>> +    }
>>> >>>>>  }
>>> >>>>>
>>> >>>>>
>>> >>>>>
>>> >>>>> Modified:
>>> >>>>>
>>> >>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>>> >>>>> URL:
>>> >>>>>
>>> >>>>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java?rev=773818&r1=773817&r2=773818&view=diff
>>> >>>>>
>>> >>>>>
>>> >>>>> ==============================================================================
>>> >>>>> ---
>>> >>>>>
>>> >>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>>> >>>>> (original)
>>> >>>>> +++
>>> >>>>>
>>> >>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>>> >>>>> Tue May 12 08:00:27 2009
>>> >>>>> @@ -60,14 +60,8 @@
>>> >>>>>                      TransportInDescription trpInDesc) throws
>>> >>>>> AxisFault
>>> >>>>> {
>>> >>>>>
>>> >>>>>         super.init(cfgCtx, trpInDesc);
>>> >>>>> -        //initialize the FIXSessionFactory
>>> >>>>> -        fixSessionFactory = new FIXSessionFactory(
>>> >>>>> -                new FIXApplicationFactory(this.cfgCtx,
>>> >>>>> this.workerPool));
>>> >>>>> -        FIXTransportSender sender = (FIXTransportSender) cfgCtx.
>>> >>>>> -
>>> >>>>>
>>> >>>>>  getAxisConfiguration().getTransportOut(FIXConstants.TRANSPORT_NAME).getSender();
>>> >>>>> -        if (sender != null) {
>>> >>>>> -            sender.setSessionFactory(fixSessionFactory);
>>> >>>>> -        }
>>> >>>>> +        fixSessionFactory = FIXSessionFactory.getInstance(new
>>> >>>>> FIXApplicationFactory(cfgCtx));
>>> >>>>> +        fixSessionFactory.setListenerThreadPool(this.workerPool);
>>> >>>>>         log.info("FIX transport listener initialized...");
>>> >>>>>     }
>>> >>>>>
>>> >>>>>
>>> >>>>> Modified:
>>> >>>>>
>>> >>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>>> >>>>> URL:
>>> >>>>>
>>> >>>>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java?rev=773818&r1=773817&r2=773818&view=diff
>>> >>>>>
>>> >>>>>
>>> >>>>> ==============================================================================
>>> >>>>> ---
>>> >>>>>
>>> >>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>>> >>>>> (original)
>>> >>>>> +++
>>> >>>>>
>>> >>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>>> >>>>> Tue May 12 08:00:27 2009
>>> >>>>> @@ -28,6 +28,8 @@
>>> >>>>>  import org.apache.axis2.transport.OutTransportInfo;
>>> >>>>>  import org.apache.axis2.transport.base.AbstractTransportSender;
>>> >>>>>  import org.apache.axis2.transport.base.BaseUtils;
>>> >>>>> +import org.apache.axis2.transport.base.threads.WorkerPool;
>>> >>>>> +import org.apache.axis2.transport.base.threads.WorkerPoolFactory;
>>> >>>>>  import org.apache.commons.logging.LogFactory;
>>> >>>>>  import quickfix.*;
>>> >>>>>  import quickfix.field.*;
>>> >>>>> @@ -51,17 +53,12 @@
>>> >>>>>
>>> >>>>>     private FIXSessionFactory sessionFactory;
>>> >>>>>     private FIXOutgoingMessageHandler messageSender;
>>> >>>>> +    private WorkerPool workerPool;
>>> >>>>>
>>> >>>>>     public FIXTransportSender() {
>>> >>>>>         this.log = LogFactory.getLog(this.getClass());
>>> >>>>>     }
>>> >>>>>
>>> >>>>> -
>>> >>>>> -    public void setSessionFactory(FIXSessionFactory
>>> >>>>> sessionFactory) {
>>> >>>>> -        this.sessionFactory = sessionFactory;
>>> >>>>> -        this.messageSender.setSessionFactory(sessionFactory);
>>> >>>>> -    }
>>> >>>>> -
>>> >>>>>     /**
>>> >>>>>      * @param cfgCtx       the axis2 configuration context
>>> >>>>>      * @param transportOut the Out Transport description
>>> >>>>> @@ -69,10 +66,25 @@
>>> >>>>>      */
>>> >>>>>     public void init(ConfigurationContext cfgCtx,
>>> >>>>> TransportOutDescription transportOut) throws AxisFault {
>>> >>>>>         super.init(cfgCtx, transportOut);
>>> >>>>> +        this.sessionFactory = FIXSessionFactory.getInstance(new
>>> >>>>> FIXApplicationFactory(cfgCtx));
>>> >>>>> +        this.workerPool = WorkerPoolFactory.getWorkerPool(
>>> >>>>> +                            10, 20, 5, -1, "FIX Sender Worker
>>> >>>>> thread
>>> >>>>> group", "FIX-Worker");
>>> >>>>> +        this.sessionFactory.setSenderThreadPool(this.workerPool);
>>> >>>>>         messageSender = new FIXOutgoingMessageHandler();
>>> >>>>> +        messageSender.setSessionFactory(this.sessionFactory);
>>> >>>>>         log.info("FIX transport sender initialized...");
>>> >>>>>     }
>>> >>>>>
>>> >>>>> +    public void stop() {
>>> >>>>> +        try {
>>> >>>>> +            this.workerPool.shutdown(10000);
>>> >>>>> +        } catch (InterruptedException e) {
>>> >>>>> +            log.warn("Thread interrupted while waiting for worker
>>> >>>>> pool
>>> >>>>> to shut down");
>>> >>>>> +        }
>>> >>>>> +        sessionFactory.disposeFIXInitiators();
>>> >>>>> +        super.stop();
>>> >>>>> +    }
>>> >>>>> +
>>> >>>>>     /**
>>> >>>>>      * Performs the actual sending of the message.
>>> >>>>>      *
>>> >>>>>
>>> >>>>>
>>> >>>>
>>> >>>>
>>> >>>>
>>> >>>> --
>>> >>>> Ruwan Linton
>>> >>>> Senior Software Engineer & Product Manager; WSO2 ESB;
>>> >>>> http://wso2.org/esb
>>> >>>> WSO2 Inc.; http://wso2.org
>>> >>>> email: ruwan@wso2.com; cell: +94 77 341 3097
>>> >>>> blog: http://ruwansblog.blogspot.com
>>> >>>
>>> >>>
>>> >>>
>>> >>> --
>>> >>> Hiranya Jayathilaka
>>> >>> Software Engineer;
>>> >>> WSO2 Inc.;  http://wso2.org
>>> >>> E-mail: hiranya@wso2.com;  Mobile: +94 77 633 3491
>>> >>> Blog: http://techfeast-hiranya.blogspot.com
>>> >>
>>> >>
>>> >>
>>> >> --
>>> >> Ruwan Linton
>>> >> Senior Software Engineer & Product Manager; WSO2 ESB;
>>> >> http://wso2.org/esb
>>> >> WSO2 Inc.; http://wso2.org
>>> >> email: ruwan@wso2.com; cell: +94 77 341 3097
>>> >> blog: http://ruwansblog.blogspot.com
>>> >
>>> >
>>> >
>>> > --
>>> > Hiranya Jayathilaka
>>> > Software Engineer;
>>> > WSO2 Inc.;  http://wso2.org
>>> > E-mail: hiranya@wso2.com;  Mobile: +94 77 633 3491
>>> > Blog: http://techfeast-hiranya.blogspot.com
>>> >
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: dev-unsubscribe@synapse.apache.org
>>> For additional commands, e-mail: dev-help@synapse.apache.org
>>>
>>
>>
>>
>> --
>> Hiranya Jayathilaka
>> Software Engineer;
>> WSO2 Inc.;  http://wso2.org
>> E-mail: hiranya@wso2.com;  Mobile: +94 77 633 3491
>> Blog: http://techfeast-hiranya.blogspot.com
>
>
>
> --
> Ruwan Linton
> Senior Software Engineer & Product Manager; WSO2 ESB; http://wso2.org/esb
> WSO2 Inc.; http://wso2.org
> email: ruwan@wso2.com; cell: +94 77 341 3097
> blog: http://ruwansblog.blogspot.com
>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@synapse.apache.org
For additional commands, e-mail: dev-help@synapse.apache.org


Re: svn commit: r773818 - in /synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix: FIXApplicationFactory.java FIXSessionFactory.java FIXTransportListener.java FIXTransportSender.java

Posted by Ruwan Linton <ru...@gmail.com>.
I prefer this to be in an optional properties file, where it is not a must
to have this properties file, as in the nio http transport case.

This is because the thread pool configuration shouldn't be a normal
configuration that the users are doing. If we put these configurations as a
parameter in the axis2.xml that might lead people to think that it is a
frequently configurable parameter.

So I think the properties file approach is better than the parameter
approach.

Thanks,
Ruwan

On Wed, May 13, 2009 at 4:12 PM, Hiranya Jayathilaka
<hi...@gmail.com>wrote:

>
>
> On Wed, May 13, 2009 at 2:41 PM, Andreas Veithen <
> andreas.veithen@gmail.com> wrote:
>
>> We definitely need to make the WorkerPool created by
>> AbstractTransportListener configurable. The question whether this
>> should be done using a property file or using parameters in
>> TransportInDescription (as with all other configuration settings).
>> What are the arguments in favor of doing it in a separate property
>> file?
>
>
> Nothing significant I guess. It might probably save us a few bytes of
> memory since with a file based approach we don't really have to save those
> parameters in the configuration context. But that's not a big motivating
> facator IMO. May be the only plus point worth considering is that it's just
> consistent with the existing approach for configuring thread pools. We
> currently use nhttp.properties to configure the HTTP-NIO thread pool.
>
> But I'm ok with using transport parameters to do this.
>
> Thanks,
> Hiranya
>
>
>>
>> Andreas
>>
>> On Wed, May 13, 2009 at 08:31, Hiranya Jayathilaka <hi...@gmail.com>
>> wrote:
>> >
>> >
>> > On Wed, May 13, 2009 at 10:30 AM, Ruwan Linton <ru...@gmail.com>
>> > wrote:
>> >>
>> >> I think we need to make that configurable as well.... currently hard
>> >> codded setting will work in 98% of the cases, but there can be a
>> scenario
>> >> where it requires a tune up.
>> >>
>> >> Can we do this in a manner that we can configure them per transport.
>> >
>> > One simple solution would be to read a transport specific configuration
>> file
>> > at AbstractTransportListener#init(). The init method gets a
>> > TransportInDescription object as an argument and from that we can
>> retrieve
>> > the transport name to construct a file name unique to a given transport
>> (eg:
>> > mail.properties, vfs.properties). This approach has the benefit that it
>> > doesn't require changes to any of the actual transport implementations.
>> > Everything is taken care of by the abstract class.
>> >
>> > However this class now belongs to the WS-Commons transports project. So
>> the
>> > enhancement should be made there.
>> >
>> > Thanks,
>> > Hiranya
>> >
>> >
>> >>
>> >> Thanks,
>> >> Ruwan
>> >>
>> >> On Wed, May 13, 2009 at 10:06 AM, Hiranya Jayathilaka
>> >> <hi...@gmail.com> wrote:
>> >>>
>> >>> Hi Ruwan,
>> >>>
>> >>> On Wed, May 13, 2009 at 12:43 AM, Ruwan Linton <
>> ruwan.linton@gmail.com>
>> >>> wrote:
>> >>>>
>> >>>> Hiranya,
>> >>>>
>> >>>> If you can make the worker pool configurable that would be of much
>> >>>> importance... you may have a look at the nhttp transport thread pool,
>> which
>> >>>> can be configurable via the nhttp.properties file.
>> >>>
>> >>> Currently the FIX sender initializes the WorkerPool in a manner
>> similar
>> >>> to the AbstractTransportListener. The WorkerPool in
>> >>> AbstractTransportListener is used by several transports (JMS, Mail
>> etc) via
>> >>> inheritance. FIX listener also makes use of the same thread pool. Do
>> we have
>> >>> any plans to make that thread pool configurable too? Othrewise I don't
>> think
>> >>> it mkes much sense just to make the FIX sender's thread pool
>> configurable.
>> >>>
>> >>> Thanks,
>> >>> Hiranya
>> >>>
>> >>>>
>> >>>>
>> >>>> Thanks,
>> >>>> Ruwan
>> >>>>
>> >>>> On Tue, May 12, 2009 at 1:30 PM, <hi...@apache.org> wrote:
>> >>>>>
>> >>>>> Author: hiranya
>> >>>>> Date: Tue May 12 08:00:27 2009
>> >>>>> New Revision: 773818
>> >>>>>
>> >>>>> URL: http://svn.apache.org/viewvc?rev=773818&view=rev
>> >>>>> Log:
>> >>>>> Enhancements and code cleanup in the FIX transport:
>> >>>>> * FIX sender now has its own worker pool and hence does not rely on
>> the
>> >>>>> FIX listener any more. Therefore listener and sender can be enabled
>> >>>>> individually
>> >>>>> * Made FIXSessionFactory a singleton to effectively share session
>> data
>> >>>>> among the listener and the sender
>> >>>>> * Cleanup logic for initiators during sender shutdown
>> >>>>> * Minor bug fix at FIXSessionFactory for a bug which prevented the
>> >>>>> sample 259 and similar scenarios from operating properly
>> >>>>>
>> >>>>> Modified:
>> >>>>>
>> >>>>>
>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>> >>>>>
>> >>>>>
>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>> >>>>>
>> >>>>>
>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>> >>>>>
>> >>>>>
>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>> >>>>>
>> >>>>> Modified:
>> >>>>>
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>> >>>>> URL:
>> >>>>>
>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java?rev=773818&r1=773817&r2=773818&view=diff
>> >>>>>
>> >>>>>
>> ==============================================================================
>> >>>>> ---
>> >>>>>
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>> >>>>> (original)
>> >>>>> +++
>> >>>>>
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>> >>>>> Tue May 12 08:00:27 2009
>> >>>>> @@ -27,15 +27,12 @@
>> >>>>>  public class FIXApplicationFactory {
>> >>>>>
>> >>>>>     private ConfigurationContext cfgCtx;
>> >>>>> -    private WorkerPool workerPool;
>> >>>>> -
>> >>>>> -    public FIXApplicationFactory(ConfigurationContext cfgCtx,
>> >>>>> WorkerPool workerPool) {
>> >>>>>
>> >>>>> +    public FIXApplicationFactory(ConfigurationContext cfgCtx) {
>> >>>>>         this.cfgCtx = cfgCtx;
>> >>>>> -        this.workerPool = workerPool;
>> >>>>>     }
>> >>>>>
>> >>>>> -    public Application getFIXApplication(AxisService service,
>> boolean
>> >>>>> acceptor) {
>> >>>>> +    public Application getFIXApplication(AxisService service,
>> >>>>> WorkerPool workerPool, boolean acceptor) {
>> >>>>>         return new FIXIncomingMessageHandler(cfgCtx, workerPool,
>> >>>>> service, acceptor);
>> >>>>>     }
>> >>>>>  }
>> >>>>> \ No newline at end of file
>> >>>>>
>> >>>>> Modified:
>> >>>>>
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>> >>>>> URL:
>> >>>>>
>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java?rev=773818&r1=773817&r2=773818&view=diff
>> >>>>>
>> >>>>>
>> ==============================================================================
>> >>>>> ---
>> >>>>>
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>> >>>>> (original)
>> >>>>> +++
>> >>>>>
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>> >>>>> Tue May 12 08:00:27 2009
>> >>>>> @@ -23,6 +23,7 @@
>> >>>>>  import org.apache.axis2.description.AxisService;
>> >>>>>  import org.apache.axis2.description.Parameter;
>> >>>>>  import org.apache.axis2.transport.base.BaseUtils;
>> >>>>> +import org.apache.axis2.transport.base.threads.WorkerPool;
>> >>>>>  import org.apache.commons.logging.Log;
>> >>>>>  import org.apache.commons.logging.LogFactory;
>> >>>>>  import quickfix.*;
>> >>>>> @@ -65,16 +66,29 @@
>> >>>>>     /** A Map containing all the FIX applications created for
>> >>>>> initiators, keyed by FIX EPR */
>> >>>>>     private Map<String, Application> applicationStore;
>> >>>>>     /** An ApplicationFactory handles creating FIX Applications
>> >>>>> (FIXIncomingMessageHandler Objects) */
>> >>>>> -    private FIXApplicationFactory applicationFactory;
>> >>>>> +    private static FIXApplicationFactory applicationFactory = null;
>> >>>>> +
>> >>>>> +    private WorkerPool listenerThreadPool;
>> >>>>> +    private WorkerPool senderThreadPool;
>> >>>>>
>> >>>>>     private Log log;
>> >>>>>
>> >>>>> -    public FIXSessionFactory(FIXApplicationFactory
>> applicationFactory)
>> >>>>> {
>> >>>>> -        this.applicationFactory = applicationFactory;
>> >>>>> +    private static FIXSessionFactory INSTANCE = new
>> >>>>> FIXSessionFactory();
>> >>>>> +
>> >>>>> +    public static FIXSessionFactory
>> getInstance(FIXApplicationFactory
>> >>>>> af) {
>> >>>>> +        if (applicationFactory == null) {
>> >>>>> +            applicationFactory = af;
>> >>>>> +        }
>> >>>>> +        return INSTANCE;
>> >>>>> +    }
>> >>>>> +
>> >>>>> +    private FIXSessionFactory() {
>> >>>>>         this.log = LogFactory.getLog(this.getClass());
>> >>>>>         this.acceptorStore = new HashMap<String,Acceptor>();
>> >>>>>         this.initiatorStore = new HashMap<String, Initiator>();
>> >>>>>         this.applicationStore = new HashMap<String, Application>();
>> >>>>> +        this.listenerThreadPool = null;
>> >>>>> +        this.senderThreadPool = null;
>> >>>>>     }
>> >>>>>
>> >>>>>     /**
>> >>>>> @@ -101,7 +115,7 @@
>> >>>>>                 MessageFactory messageFactory = new
>> >>>>> DefaultMessageFactory();
>> >>>>>                 quickfix.LogFactory logFactory =
>> getLogFactory(service,
>> >>>>> settings, true);
>> >>>>>                 //Get a new FIX Application
>> >>>>> -                Application messageHandler =
>> >>>>> applicationFactory.getFIXApplication(service, true);
>> >>>>> +                Application messageHandler =
>> >>>>> applicationFactory.getFIXApplication(service, listenerThreadPool,
>> true);
>> >>>>>                 //Create a new FIX Acceptor
>> >>>>>                 Acceptor acceptor = new SocketAcceptor(
>> >>>>>                         messageHandler,
>> >>>>> @@ -174,7 +188,7 @@
>> >>>>>         MessageStoreFactory storeFactory =
>> >>>>> getMessageStoreFactory(service, settings, false);
>> >>>>>         MessageFactory messageFactory = new DefaultMessageFactory();
>> >>>>>         //Get a new FIX application
>> >>>>> -        Application messageHandler =
>> >>>>> applicationFactory.getFIXApplication(service, false);
>> >>>>> +        Application messageHandler =
>> >>>>> applicationFactory.getFIXApplication(service, senderThreadPool,
>> false);
>> >>>>>
>> >>>>>         try {
>> >>>>>            //Create a new FIX initiator
>> >>>>> @@ -216,7 +230,7 @@
>> >>>>>                 MessageFactory messageFactory = new
>> >>>>> DefaultMessageFactory();
>> >>>>>                 quickfix.LogFactory logFactory =
>> getLogFactory(service,
>> >>>>> settings, true);
>> >>>>>                 //Get a new FIX Application
>> >>>>> -                Application messageHandler =
>> >>>>> applicationFactory.getFIXApplication(service, false);
>> >>>>> +                Application messageHandler =
>> >>>>> applicationFactory.getFIXApplication(service, senderThreadPool,
>> false);
>> >>>>>
>> >>>>>                 Initiator initiator = new SocketInitiator(
>> >>>>>                     messageHandler,
>> >>>>> @@ -246,10 +260,10 @@
>> >>>>>             }
>> >>>>>
>> >>>>>         } else {
>> >>>>> -            String msg = "The " +
>> >>>>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " +
>> >>>>> -                    "not specified. Unable to initialize the
>> initiator
>> >>>>> session at this stage.";
>> >>>>> -            log.info(msg);
>> >>>>> -            throw new AxisFault(msg);
>> >>>>> +            // FIX initiator session is not configured
>> >>>>> +            // It could be intentional - So not an error (we don't
>> >>>>> need initiators at all times)
>> >>>>> +            log.info("The " +
>> >>>>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " +
>> >>>>> +                    "not specified. Unable to initialize the
>> initiator
>> >>>>> session at this stage.");
>> >>>>>         }
>> >>>>>     }
>> >>>>>
>> >>>>> @@ -276,6 +290,24 @@
>> >>>>>     }
>> >>>>>
>> >>>>>     /**
>> >>>>> +     * Stops all the FIX initiators created so far and cleans up
>> all
>> >>>>> the mappings
>> >>>>> +     * related to them
>> >>>>> +     */
>> >>>>> +    public void disposeFIXInitiators() {
>> >>>>> +        boolean debugEnabled = log.isDebugEnabled();
>> >>>>> +
>> >>>>> +        for (String key : initiatorStore.keySet()) {
>> >>>>> +            initiatorStore.get(key).stop();
>> >>>>> +            if (debugEnabled) {
>> >>>>> +                log.debug("FIX initiator to the EPR " + key + "
>> >>>>> stopped");
>> >>>>> +            }
>> >>>>> +        }
>> >>>>> +
>> >>>>> +        initiatorStore.clear();
>> >>>>> +        applicationStore.clear();
>> >>>>> +    }
>> >>>>> +
>> >>>>> +    /**
>> >>>>>      * Returns an array of Strings representing EPRs for the
>> specified
>> >>>>> service
>> >>>>>      *
>> >>>>>      * @param serviceName the name of the service
>> >>>>> @@ -444,6 +476,14 @@
>> >>>>>         }
>> >>>>>         return app;
>> >>>>>     }
>> >>>>> +
>> >>>>> +    public void setListenerThreadPool(WorkerPool
>> listenerThreadPool) {
>> >>>>> +        this.listenerThreadPool = listenerThreadPool;
>> >>>>> +    }
>> >>>>> +
>> >>>>> +    public void setSenderThreadPool(WorkerPool senderThreadPool) {
>> >>>>> +        this.senderThreadPool = senderThreadPool;
>> >>>>> +    }
>> >>>>>  }
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>> Modified:
>> >>>>>
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>> >>>>> URL:
>> >>>>>
>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java?rev=773818&r1=773817&r2=773818&view=diff
>> >>>>>
>> >>>>>
>> ==============================================================================
>> >>>>> ---
>> >>>>>
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>> >>>>> (original)
>> >>>>> +++
>> >>>>>
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>> >>>>> Tue May 12 08:00:27 2009
>> >>>>> @@ -60,14 +60,8 @@
>> >>>>>                      TransportInDescription trpInDesc) throws
>> AxisFault
>> >>>>> {
>> >>>>>
>> >>>>>         super.init(cfgCtx, trpInDesc);
>> >>>>> -        //initialize the FIXSessionFactory
>> >>>>> -        fixSessionFactory = new FIXSessionFactory(
>> >>>>> -                new FIXApplicationFactory(this.cfgCtx,
>> >>>>> this.workerPool));
>> >>>>> -        FIXTransportSender sender = (FIXTransportSender) cfgCtx.
>> >>>>> -
>> >>>>>
>>  getAxisConfiguration().getTransportOut(FIXConstants.TRANSPORT_NAME).getSender();
>> >>>>> -        if (sender != null) {
>> >>>>> -            sender.setSessionFactory(fixSessionFactory);
>> >>>>> -        }
>> >>>>> +        fixSessionFactory = FIXSessionFactory.getInstance(new
>> >>>>> FIXApplicationFactory(cfgCtx));
>> >>>>> +        fixSessionFactory.setListenerThreadPool(this.workerPool);
>> >>>>>         log.info("FIX transport listener initialized...");
>> >>>>>     }
>> >>>>>
>> >>>>>
>> >>>>> Modified:
>> >>>>>
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>> >>>>> URL:
>> >>>>>
>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java?rev=773818&r1=773817&r2=773818&view=diff
>> >>>>>
>> >>>>>
>> ==============================================================================
>> >>>>> ---
>> >>>>>
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>> >>>>> (original)
>> >>>>> +++
>> >>>>>
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>> >>>>> Tue May 12 08:00:27 2009
>> >>>>> @@ -28,6 +28,8 @@
>> >>>>>  import org.apache.axis2.transport.OutTransportInfo;
>> >>>>>  import org.apache.axis2.transport.base.AbstractTransportSender;
>> >>>>>  import org.apache.axis2.transport.base.BaseUtils;
>> >>>>> +import org.apache.axis2.transport.base.threads.WorkerPool;
>> >>>>> +import org.apache.axis2.transport.base.threads.WorkerPoolFactory;
>> >>>>>  import org.apache.commons.logging.LogFactory;
>> >>>>>  import quickfix.*;
>> >>>>>  import quickfix.field.*;
>> >>>>> @@ -51,17 +53,12 @@
>> >>>>>
>> >>>>>     private FIXSessionFactory sessionFactory;
>> >>>>>     private FIXOutgoingMessageHandler messageSender;
>> >>>>> +    private WorkerPool workerPool;
>> >>>>>
>> >>>>>     public FIXTransportSender() {
>> >>>>>         this.log = LogFactory.getLog(this.getClass());
>> >>>>>     }
>> >>>>>
>> >>>>> -
>> >>>>> -    public void setSessionFactory(FIXSessionFactory sessionFactory)
>> {
>> >>>>> -        this.sessionFactory = sessionFactory;
>> >>>>> -        this.messageSender.setSessionFactory(sessionFactory);
>> >>>>> -    }
>> >>>>> -
>> >>>>>     /**
>> >>>>>      * @param cfgCtx       the axis2 configuration context
>> >>>>>      * @param transportOut the Out Transport description
>> >>>>> @@ -69,10 +66,25 @@
>> >>>>>      */
>> >>>>>     public void init(ConfigurationContext cfgCtx,
>> >>>>> TransportOutDescription transportOut) throws AxisFault {
>> >>>>>         super.init(cfgCtx, transportOut);
>> >>>>> +        this.sessionFactory = FIXSessionFactory.getInstance(new
>> >>>>> FIXApplicationFactory(cfgCtx));
>> >>>>> +        this.workerPool = WorkerPoolFactory.getWorkerPool(
>> >>>>> +                            10, 20, 5, -1, "FIX Sender Worker
>> thread
>> >>>>> group", "FIX-Worker");
>> >>>>> +        this.sessionFactory.setSenderThreadPool(this.workerPool);
>> >>>>>         messageSender = new FIXOutgoingMessageHandler();
>> >>>>> +        messageSender.setSessionFactory(this.sessionFactory);
>> >>>>>         log.info("FIX transport sender initialized...");
>> >>>>>     }
>> >>>>>
>> >>>>> +    public void stop() {
>> >>>>> +        try {
>> >>>>> +            this.workerPool.shutdown(10000);
>> >>>>> +        } catch (InterruptedException e) {
>> >>>>> +            log.warn("Thread interrupted while waiting for worker
>> pool
>> >>>>> to shut down");
>> >>>>> +        }
>> >>>>> +        sessionFactory.disposeFIXInitiators();
>> >>>>> +        super.stop();
>> >>>>> +    }
>> >>>>> +
>> >>>>>     /**
>> >>>>>      * Performs the actual sending of the message.
>> >>>>>      *
>> >>>>>
>> >>>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>> --
>> >>>> Ruwan Linton
>> >>>> Senior Software Engineer & Product Manager; WSO2 ESB;
>> >>>> http://wso2.org/esb
>> >>>> WSO2 Inc.; http://wso2.org
>> >>>> email: ruwan@wso2.com; cell: +94 77 341 3097
>> >>>> blog: http://ruwansblog.blogspot.com
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> Hiranya Jayathilaka
>> >>> Software Engineer;
>> >>> WSO2 Inc.;  http://wso2.org
>> >>> E-mail: hiranya@wso2.com;  Mobile: +94 77 633 3491
>> >>> Blog: http://techfeast-hiranya.blogspot.com
>> >>
>> >>
>> >>
>> >> --
>> >> Ruwan Linton
>> >> Senior Software Engineer & Product Manager; WSO2 ESB;
>> http://wso2.org/esb
>> >> WSO2 Inc.; http://wso2.org
>> >> email: ruwan@wso2.com; cell: +94 77 341 3097
>> >> blog: http://ruwansblog.blogspot.com
>> >
>> >
>> >
>> > --
>> > Hiranya Jayathilaka
>> > Software Engineer;
>> > WSO2 Inc.;  http://wso2.org
>> > E-mail: hiranya@wso2.com;  Mobile: +94 77 633 3491
>> > Blog: http://techfeast-hiranya.blogspot.com
>> >
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: dev-unsubscribe@synapse.apache.org
>> For additional commands, e-mail: dev-help@synapse.apache.org
>>
>>
>
>
> --
> Hiranya Jayathilaka
> Software Engineer;
> WSO2 Inc.;  http://wso2.org
> E-mail: hiranya@wso2.com;  Mobile: +94 77 633 3491
> Blog: http://techfeast-hiranya.blogspot.com
>



-- 
Ruwan Linton
Senior Software Engineer & Product Manager; WSO2 ESB; http://wso2.org/esb
WSO2 Inc.; http://wso2.org
email: ruwan@wso2.com; cell: +94 77 341 3097
blog: http://ruwansblog.blogspot.com

Re: svn commit: r773818 - in /synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix: FIXApplicationFactory.java FIXSessionFactory.java FIXTransportListener.java FIXTransportSender.java

Posted by Hiranya Jayathilaka <hi...@gmail.com>.
On Wed, May 13, 2009 at 2:41 PM, Andreas Veithen
<an...@gmail.com>wrote:

> We definitely need to make the WorkerPool created by
> AbstractTransportListener configurable. The question whether this
> should be done using a property file or using parameters in
> TransportInDescription (as with all other configuration settings).
> What are the arguments in favor of doing it in a separate property
> file?


Nothing significant I guess. It might probably save us a few bytes of memory
since with a file based approach we don't really have to save those
parameters in the configuration context. But that's not a big motivating
facator IMO. May be the only plus point worth considering is that it's just
consistent with the existing approach for configuring thread pools. We
currently use nhttp.properties to configure the HTTP-NIO thread pool.

But I'm ok with using transport parameters to do this.

Thanks,
Hiranya


>
> Andreas
>
> On Wed, May 13, 2009 at 08:31, Hiranya Jayathilaka <hi...@gmail.com>
> wrote:
> >
> >
> > On Wed, May 13, 2009 at 10:30 AM, Ruwan Linton <ru...@gmail.com>
> > wrote:
> >>
> >> I think we need to make that configurable as well.... currently hard
> >> codded setting will work in 98% of the cases, but there can be a
> scenario
> >> where it requires a tune up.
> >>
> >> Can we do this in a manner that we can configure them per transport.
> >
> > One simple solution would be to read a transport specific configuration
> file
> > at AbstractTransportListener#init(). The init method gets a
> > TransportInDescription object as an argument and from that we can
> retrieve
> > the transport name to construct a file name unique to a given transport
> (eg:
> > mail.properties, vfs.properties). This approach has the benefit that it
> > doesn't require changes to any of the actual transport implementations.
> > Everything is taken care of by the abstract class.
> >
> > However this class now belongs to the WS-Commons transports project. So
> the
> > enhancement should be made there.
> >
> > Thanks,
> > Hiranya
> >
> >
> >>
> >> Thanks,
> >> Ruwan
> >>
> >> On Wed, May 13, 2009 at 10:06 AM, Hiranya Jayathilaka
> >> <hi...@gmail.com> wrote:
> >>>
> >>> Hi Ruwan,
> >>>
> >>> On Wed, May 13, 2009 at 12:43 AM, Ruwan Linton <ruwan.linton@gmail.com
> >
> >>> wrote:
> >>>>
> >>>> Hiranya,
> >>>>
> >>>> If you can make the worker pool configurable that would be of much
> >>>> importance... you may have a look at the nhttp transport thread pool,
> which
> >>>> can be configurable via the nhttp.properties file.
> >>>
> >>> Currently the FIX sender initializes the WorkerPool in a manner similar
> >>> to the AbstractTransportListener. The WorkerPool in
> >>> AbstractTransportListener is used by several transports (JMS, Mail etc)
> via
> >>> inheritance. FIX listener also makes use of the same thread pool. Do we
> have
> >>> any plans to make that thread pool configurable too? Othrewise I don't
> think
> >>> it mkes much sense just to make the FIX sender's thread pool
> configurable.
> >>>
> >>> Thanks,
> >>> Hiranya
> >>>
> >>>>
> >>>>
> >>>> Thanks,
> >>>> Ruwan
> >>>>
> >>>> On Tue, May 12, 2009 at 1:30 PM, <hi...@apache.org> wrote:
> >>>>>
> >>>>> Author: hiranya
> >>>>> Date: Tue May 12 08:00:27 2009
> >>>>> New Revision: 773818
> >>>>>
> >>>>> URL: http://svn.apache.org/viewvc?rev=773818&view=rev
> >>>>> Log:
> >>>>> Enhancements and code cleanup in the FIX transport:
> >>>>> * FIX sender now has its own worker pool and hence does not rely on
> the
> >>>>> FIX listener any more. Therefore listener and sender can be enabled
> >>>>> individually
> >>>>> * Made FIXSessionFactory a singleton to effectively share session
> data
> >>>>> among the listener and the sender
> >>>>> * Cleanup logic for initiators during sender shutdown
> >>>>> * Minor bug fix at FIXSessionFactory for a bug which prevented the
> >>>>> sample 259 and similar scenarios from operating properly
> >>>>>
> >>>>> Modified:
> >>>>>
> >>>>>
>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
> >>>>>
> >>>>>
>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
> >>>>>
> >>>>>
>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
> >>>>>
> >>>>>
>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
> >>>>>
> >>>>> Modified:
> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
> >>>>> URL:
> >>>>>
> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java?rev=773818&r1=773817&r2=773818&view=diff
> >>>>>
> >>>>>
> ==============================================================================
> >>>>> ---
> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
> >>>>> (original)
> >>>>> +++
> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
> >>>>> Tue May 12 08:00:27 2009
> >>>>> @@ -27,15 +27,12 @@
> >>>>>  public class FIXApplicationFactory {
> >>>>>
> >>>>>     private ConfigurationContext cfgCtx;
> >>>>> -    private WorkerPool workerPool;
> >>>>> -
> >>>>> -    public FIXApplicationFactory(ConfigurationContext cfgCtx,
> >>>>> WorkerPool workerPool) {
> >>>>>
> >>>>> +    public FIXApplicationFactory(ConfigurationContext cfgCtx) {
> >>>>>         this.cfgCtx = cfgCtx;
> >>>>> -        this.workerPool = workerPool;
> >>>>>     }
> >>>>>
> >>>>> -    public Application getFIXApplication(AxisService service,
> boolean
> >>>>> acceptor) {
> >>>>> +    public Application getFIXApplication(AxisService service,
> >>>>> WorkerPool workerPool, boolean acceptor) {
> >>>>>         return new FIXIncomingMessageHandler(cfgCtx, workerPool,
> >>>>> service, acceptor);
> >>>>>     }
> >>>>>  }
> >>>>> \ No newline at end of file
> >>>>>
> >>>>> Modified:
> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
> >>>>> URL:
> >>>>>
> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java?rev=773818&r1=773817&r2=773818&view=diff
> >>>>>
> >>>>>
> ==============================================================================
> >>>>> ---
> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
> >>>>> (original)
> >>>>> +++
> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
> >>>>> Tue May 12 08:00:27 2009
> >>>>> @@ -23,6 +23,7 @@
> >>>>>  import org.apache.axis2.description.AxisService;
> >>>>>  import org.apache.axis2.description.Parameter;
> >>>>>  import org.apache.axis2.transport.base.BaseUtils;
> >>>>> +import org.apache.axis2.transport.base.threads.WorkerPool;
> >>>>>  import org.apache.commons.logging.Log;
> >>>>>  import org.apache.commons.logging.LogFactory;
> >>>>>  import quickfix.*;
> >>>>> @@ -65,16 +66,29 @@
> >>>>>     /** A Map containing all the FIX applications created for
> >>>>> initiators, keyed by FIX EPR */
> >>>>>     private Map<String, Application> applicationStore;
> >>>>>     /** An ApplicationFactory handles creating FIX Applications
> >>>>> (FIXIncomingMessageHandler Objects) */
> >>>>> -    private FIXApplicationFactory applicationFactory;
> >>>>> +    private static FIXApplicationFactory applicationFactory = null;
> >>>>> +
> >>>>> +    private WorkerPool listenerThreadPool;
> >>>>> +    private WorkerPool senderThreadPool;
> >>>>>
> >>>>>     private Log log;
> >>>>>
> >>>>> -    public FIXSessionFactory(FIXApplicationFactory
> applicationFactory)
> >>>>> {
> >>>>> -        this.applicationFactory = applicationFactory;
> >>>>> +    private static FIXSessionFactory INSTANCE = new
> >>>>> FIXSessionFactory();
> >>>>> +
> >>>>> +    public static FIXSessionFactory
> getInstance(FIXApplicationFactory
> >>>>> af) {
> >>>>> +        if (applicationFactory == null) {
> >>>>> +            applicationFactory = af;
> >>>>> +        }
> >>>>> +        return INSTANCE;
> >>>>> +    }
> >>>>> +
> >>>>> +    private FIXSessionFactory() {
> >>>>>         this.log = LogFactory.getLog(this.getClass());
> >>>>>         this.acceptorStore = new HashMap<String,Acceptor>();
> >>>>>         this.initiatorStore = new HashMap<String, Initiator>();
> >>>>>         this.applicationStore = new HashMap<String, Application>();
> >>>>> +        this.listenerThreadPool = null;
> >>>>> +        this.senderThreadPool = null;
> >>>>>     }
> >>>>>
> >>>>>     /**
> >>>>> @@ -101,7 +115,7 @@
> >>>>>                 MessageFactory messageFactory = new
> >>>>> DefaultMessageFactory();
> >>>>>                 quickfix.LogFactory logFactory =
> getLogFactory(service,
> >>>>> settings, true);
> >>>>>                 //Get a new FIX Application
> >>>>> -                Application messageHandler =
> >>>>> applicationFactory.getFIXApplication(service, true);
> >>>>> +                Application messageHandler =
> >>>>> applicationFactory.getFIXApplication(service, listenerThreadPool,
> true);
> >>>>>                 //Create a new FIX Acceptor
> >>>>>                 Acceptor acceptor = new SocketAcceptor(
> >>>>>                         messageHandler,
> >>>>> @@ -174,7 +188,7 @@
> >>>>>         MessageStoreFactory storeFactory =
> >>>>> getMessageStoreFactory(service, settings, false);
> >>>>>         MessageFactory messageFactory = new DefaultMessageFactory();
> >>>>>         //Get a new FIX application
> >>>>> -        Application messageHandler =
> >>>>> applicationFactory.getFIXApplication(service, false);
> >>>>> +        Application messageHandler =
> >>>>> applicationFactory.getFIXApplication(service, senderThreadPool,
> false);
> >>>>>
> >>>>>         try {
> >>>>>            //Create a new FIX initiator
> >>>>> @@ -216,7 +230,7 @@
> >>>>>                 MessageFactory messageFactory = new
> >>>>> DefaultMessageFactory();
> >>>>>                 quickfix.LogFactory logFactory =
> getLogFactory(service,
> >>>>> settings, true);
> >>>>>                 //Get a new FIX Application
> >>>>> -                Application messageHandler =
> >>>>> applicationFactory.getFIXApplication(service, false);
> >>>>> +                Application messageHandler =
> >>>>> applicationFactory.getFIXApplication(service, senderThreadPool,
> false);
> >>>>>
> >>>>>                 Initiator initiator = new SocketInitiator(
> >>>>>                     messageHandler,
> >>>>> @@ -246,10 +260,10 @@
> >>>>>             }
> >>>>>
> >>>>>         } else {
> >>>>> -            String msg = "The " +
> >>>>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " +
> >>>>> -                    "not specified. Unable to initialize the
> initiator
> >>>>> session at this stage.";
> >>>>> -            log.info(msg);
> >>>>> -            throw new AxisFault(msg);
> >>>>> +            // FIX initiator session is not configured
> >>>>> +            // It could be intentional - So not an error (we don't
> >>>>> need initiators at all times)
> >>>>> +            log.info("The " +
> >>>>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " +
> >>>>> +                    "not specified. Unable to initialize the
> initiator
> >>>>> session at this stage.");
> >>>>>         }
> >>>>>     }
> >>>>>
> >>>>> @@ -276,6 +290,24 @@
> >>>>>     }
> >>>>>
> >>>>>     /**
> >>>>> +     * Stops all the FIX initiators created so far and cleans up all
> >>>>> the mappings
> >>>>> +     * related to them
> >>>>> +     */
> >>>>> +    public void disposeFIXInitiators() {
> >>>>> +        boolean debugEnabled = log.isDebugEnabled();
> >>>>> +
> >>>>> +        for (String key : initiatorStore.keySet()) {
> >>>>> +            initiatorStore.get(key).stop();
> >>>>> +            if (debugEnabled) {
> >>>>> +                log.debug("FIX initiator to the EPR " + key + "
> >>>>> stopped");
> >>>>> +            }
> >>>>> +        }
> >>>>> +
> >>>>> +        initiatorStore.clear();
> >>>>> +        applicationStore.clear();
> >>>>> +    }
> >>>>> +
> >>>>> +    /**
> >>>>>      * Returns an array of Strings representing EPRs for the
> specified
> >>>>> service
> >>>>>      *
> >>>>>      * @param serviceName the name of the service
> >>>>> @@ -444,6 +476,14 @@
> >>>>>         }
> >>>>>         return app;
> >>>>>     }
> >>>>> +
> >>>>> +    public void setListenerThreadPool(WorkerPool listenerThreadPool)
> {
> >>>>> +        this.listenerThreadPool = listenerThreadPool;
> >>>>> +    }
> >>>>> +
> >>>>> +    public void setSenderThreadPool(WorkerPool senderThreadPool) {
> >>>>> +        this.senderThreadPool = senderThreadPool;
> >>>>> +    }
> >>>>>  }
> >>>>>
> >>>>>
> >>>>>
> >>>>> Modified:
> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
> >>>>> URL:
> >>>>>
> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java?rev=773818&r1=773817&r2=773818&view=diff
> >>>>>
> >>>>>
> ==============================================================================
> >>>>> ---
> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
> >>>>> (original)
> >>>>> +++
> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
> >>>>> Tue May 12 08:00:27 2009
> >>>>> @@ -60,14 +60,8 @@
> >>>>>                      TransportInDescription trpInDesc) throws
> AxisFault
> >>>>> {
> >>>>>
> >>>>>         super.init(cfgCtx, trpInDesc);
> >>>>> -        //initialize the FIXSessionFactory
> >>>>> -        fixSessionFactory = new FIXSessionFactory(
> >>>>> -                new FIXApplicationFactory(this.cfgCtx,
> >>>>> this.workerPool));
> >>>>> -        FIXTransportSender sender = (FIXTransportSender) cfgCtx.
> >>>>> -
> >>>>>
>  getAxisConfiguration().getTransportOut(FIXConstants.TRANSPORT_NAME).getSender();
> >>>>> -        if (sender != null) {
> >>>>> -            sender.setSessionFactory(fixSessionFactory);
> >>>>> -        }
> >>>>> +        fixSessionFactory = FIXSessionFactory.getInstance(new
> >>>>> FIXApplicationFactory(cfgCtx));
> >>>>> +        fixSessionFactory.setListenerThreadPool(this.workerPool);
> >>>>>         log.info("FIX transport listener initialized...");
> >>>>>     }
> >>>>>
> >>>>>
> >>>>> Modified:
> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
> >>>>> URL:
> >>>>>
> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java?rev=773818&r1=773817&r2=773818&view=diff
> >>>>>
> >>>>>
> ==============================================================================
> >>>>> ---
> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
> >>>>> (original)
> >>>>> +++
> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
> >>>>> Tue May 12 08:00:27 2009
> >>>>> @@ -28,6 +28,8 @@
> >>>>>  import org.apache.axis2.transport.OutTransportInfo;
> >>>>>  import org.apache.axis2.transport.base.AbstractTransportSender;
> >>>>>  import org.apache.axis2.transport.base.BaseUtils;
> >>>>> +import org.apache.axis2.transport.base.threads.WorkerPool;
> >>>>> +import org.apache.axis2.transport.base.threads.WorkerPoolFactory;
> >>>>>  import org.apache.commons.logging.LogFactory;
> >>>>>  import quickfix.*;
> >>>>>  import quickfix.field.*;
> >>>>> @@ -51,17 +53,12 @@
> >>>>>
> >>>>>     private FIXSessionFactory sessionFactory;
> >>>>>     private FIXOutgoingMessageHandler messageSender;
> >>>>> +    private WorkerPool workerPool;
> >>>>>
> >>>>>     public FIXTransportSender() {
> >>>>>         this.log = LogFactory.getLog(this.getClass());
> >>>>>     }
> >>>>>
> >>>>> -
> >>>>> -    public void setSessionFactory(FIXSessionFactory sessionFactory)
> {
> >>>>> -        this.sessionFactory = sessionFactory;
> >>>>> -        this.messageSender.setSessionFactory(sessionFactory);
> >>>>> -    }
> >>>>> -
> >>>>>     /**
> >>>>>      * @param cfgCtx       the axis2 configuration context
> >>>>>      * @param transportOut the Out Transport description
> >>>>> @@ -69,10 +66,25 @@
> >>>>>      */
> >>>>>     public void init(ConfigurationContext cfgCtx,
> >>>>> TransportOutDescription transportOut) throws AxisFault {
> >>>>>         super.init(cfgCtx, transportOut);
> >>>>> +        this.sessionFactory = FIXSessionFactory.getInstance(new
> >>>>> FIXApplicationFactory(cfgCtx));
> >>>>> +        this.workerPool = WorkerPoolFactory.getWorkerPool(
> >>>>> +                            10, 20, 5, -1, "FIX Sender Worker thread
> >>>>> group", "FIX-Worker");
> >>>>> +        this.sessionFactory.setSenderThreadPool(this.workerPool);
> >>>>>         messageSender = new FIXOutgoingMessageHandler();
> >>>>> +        messageSender.setSessionFactory(this.sessionFactory);
> >>>>>         log.info("FIX transport sender initialized...");
> >>>>>     }
> >>>>>
> >>>>> +    public void stop() {
> >>>>> +        try {
> >>>>> +            this.workerPool.shutdown(10000);
> >>>>> +        } catch (InterruptedException e) {
> >>>>> +            log.warn("Thread interrupted while waiting for worker
> pool
> >>>>> to shut down");
> >>>>> +        }
> >>>>> +        sessionFactory.disposeFIXInitiators();
> >>>>> +        super.stop();
> >>>>> +    }
> >>>>> +
> >>>>>     /**
> >>>>>      * Performs the actual sending of the message.
> >>>>>      *
> >>>>>
> >>>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> Ruwan Linton
> >>>> Senior Software Engineer & Product Manager; WSO2 ESB;
> >>>> http://wso2.org/esb
> >>>> WSO2 Inc.; http://wso2.org
> >>>> email: ruwan@wso2.com; cell: +94 77 341 3097
> >>>> blog: http://ruwansblog.blogspot.com
> >>>
> >>>
> >>>
> >>> --
> >>> Hiranya Jayathilaka
> >>> Software Engineer;
> >>> WSO2 Inc.;  http://wso2.org
> >>> E-mail: hiranya@wso2.com;  Mobile: +94 77 633 3491
> >>> Blog: http://techfeast-hiranya.blogspot.com
> >>
> >>
> >>
> >> --
> >> Ruwan Linton
> >> Senior Software Engineer & Product Manager; WSO2 ESB;
> http://wso2.org/esb
> >> WSO2 Inc.; http://wso2.org
> >> email: ruwan@wso2.com; cell: +94 77 341 3097
> >> blog: http://ruwansblog.blogspot.com
> >
> >
> >
> > --
> > Hiranya Jayathilaka
> > Software Engineer;
> > WSO2 Inc.;  http://wso2.org
> > E-mail: hiranya@wso2.com;  Mobile: +94 77 633 3491
> > Blog: http://techfeast-hiranya.blogspot.com
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@synapse.apache.org
> For additional commands, e-mail: dev-help@synapse.apache.org
>
>


-- 
Hiranya Jayathilaka
Software Engineer;
WSO2 Inc.;  http://wso2.org
E-mail: hiranya@wso2.com;  Mobile: +94 77 633 3491
Blog: http://techfeast-hiranya.blogspot.com

Re: svn commit: r773818 - in /synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix: FIXApplicationFactory.java FIXSessionFactory.java FIXTransportListener.java FIXTransportSender.java

Posted by Andreas Veithen <an...@gmail.com>.
We definitely need to make the WorkerPool created by
AbstractTransportListener configurable. The question whether this
should be done using a property file or using parameters in
TransportInDescription (as with all other configuration settings).
What are the arguments in favor of doing it in a separate property
file?

Andreas

On Wed, May 13, 2009 at 08:31, Hiranya Jayathilaka <hi...@gmail.com> wrote:
>
>
> On Wed, May 13, 2009 at 10:30 AM, Ruwan Linton <ru...@gmail.com>
> wrote:
>>
>> I think we need to make that configurable as well.... currently hard
>> codded setting will work in 98% of the cases, but there can be a scenario
>> where it requires a tune up.
>>
>> Can we do this in a manner that we can configure them per transport.
>
> One simple solution would be to read a transport specific configuration file
> at AbstractTransportListener#init(). The init method gets a
> TransportInDescription object as an argument and from that we can retrieve
> the transport name to construct a file name unique to a given transport (eg:
> mail.properties, vfs.properties). This approach has the benefit that it
> doesn't require changes to any of the actual transport implementations.
> Everything is taken care of by the abstract class.
>
> However this class now belongs to the WS-Commons transports project. So the
> enhancement should be made there.
>
> Thanks,
> Hiranya
>
>
>>
>> Thanks,
>> Ruwan
>>
>> On Wed, May 13, 2009 at 10:06 AM, Hiranya Jayathilaka
>> <hi...@gmail.com> wrote:
>>>
>>> Hi Ruwan,
>>>
>>> On Wed, May 13, 2009 at 12:43 AM, Ruwan Linton <ru...@gmail.com>
>>> wrote:
>>>>
>>>> Hiranya,
>>>>
>>>> If you can make the worker pool configurable that would be of much
>>>> importance... you may have a look at the nhttp transport thread pool, which
>>>> can be configurable via the nhttp.properties file.
>>>
>>> Currently the FIX sender initializes the WorkerPool in a manner similar
>>> to the AbstractTransportListener. The WorkerPool in
>>> AbstractTransportListener is used by several transports (JMS, Mail etc) via
>>> inheritance. FIX listener also makes use of the same thread pool. Do we have
>>> any plans to make that thread pool configurable too? Othrewise I don't think
>>> it mkes much sense just to make the FIX sender's thread pool configurable.
>>>
>>> Thanks,
>>> Hiranya
>>>
>>>>
>>>>
>>>> Thanks,
>>>> Ruwan
>>>>
>>>> On Tue, May 12, 2009 at 1:30 PM, <hi...@apache.org> wrote:
>>>>>
>>>>> Author: hiranya
>>>>> Date: Tue May 12 08:00:27 2009
>>>>> New Revision: 773818
>>>>>
>>>>> URL: http://svn.apache.org/viewvc?rev=773818&view=rev
>>>>> Log:
>>>>> Enhancements and code cleanup in the FIX transport:
>>>>> * FIX sender now has its own worker pool and hence does not rely on the
>>>>> FIX listener any more. Therefore listener and sender can be enabled
>>>>> individually
>>>>> * Made FIXSessionFactory a singleton to effectively share session data
>>>>> among the listener and the sender
>>>>> * Cleanup logic for initiators during sender shutdown
>>>>> * Minor bug fix at FIXSessionFactory for a bug which prevented the
>>>>> sample 259 and similar scenarios from operating properly
>>>>>
>>>>> Modified:
>>>>>
>>>>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>>>>>
>>>>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>>>>>
>>>>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>>>>>
>>>>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>>>>>
>>>>> Modified:
>>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>>>>> URL:
>>>>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java?rev=773818&r1=773817&r2=773818&view=diff
>>>>>
>>>>> ==============================================================================
>>>>> ---
>>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>>>>> (original)
>>>>> +++
>>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>>>>> Tue May 12 08:00:27 2009
>>>>> @@ -27,15 +27,12 @@
>>>>>  public class FIXApplicationFactory {
>>>>>
>>>>>     private ConfigurationContext cfgCtx;
>>>>> -    private WorkerPool workerPool;
>>>>> -
>>>>> -    public FIXApplicationFactory(ConfigurationContext cfgCtx,
>>>>> WorkerPool workerPool) {
>>>>>
>>>>> +    public FIXApplicationFactory(ConfigurationContext cfgCtx) {
>>>>>         this.cfgCtx = cfgCtx;
>>>>> -        this.workerPool = workerPool;
>>>>>     }
>>>>>
>>>>> -    public Application getFIXApplication(AxisService service, boolean
>>>>> acceptor) {
>>>>> +    public Application getFIXApplication(AxisService service,
>>>>> WorkerPool workerPool, boolean acceptor) {
>>>>>         return new FIXIncomingMessageHandler(cfgCtx, workerPool,
>>>>> service, acceptor);
>>>>>     }
>>>>>  }
>>>>> \ No newline at end of file
>>>>>
>>>>> Modified:
>>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>>>>> URL:
>>>>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java?rev=773818&r1=773817&r2=773818&view=diff
>>>>>
>>>>> ==============================================================================
>>>>> ---
>>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>>>>> (original)
>>>>> +++
>>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>>>>> Tue May 12 08:00:27 2009
>>>>> @@ -23,6 +23,7 @@
>>>>>  import org.apache.axis2.description.AxisService;
>>>>>  import org.apache.axis2.description.Parameter;
>>>>>  import org.apache.axis2.transport.base.BaseUtils;
>>>>> +import org.apache.axis2.transport.base.threads.WorkerPool;
>>>>>  import org.apache.commons.logging.Log;
>>>>>  import org.apache.commons.logging.LogFactory;
>>>>>  import quickfix.*;
>>>>> @@ -65,16 +66,29 @@
>>>>>     /** A Map containing all the FIX applications created for
>>>>> initiators, keyed by FIX EPR */
>>>>>     private Map<String, Application> applicationStore;
>>>>>     /** An ApplicationFactory handles creating FIX Applications
>>>>> (FIXIncomingMessageHandler Objects) */
>>>>> -    private FIXApplicationFactory applicationFactory;
>>>>> +    private static FIXApplicationFactory applicationFactory = null;
>>>>> +
>>>>> +    private WorkerPool listenerThreadPool;
>>>>> +    private WorkerPool senderThreadPool;
>>>>>
>>>>>     private Log log;
>>>>>
>>>>> -    public FIXSessionFactory(FIXApplicationFactory applicationFactory)
>>>>> {
>>>>> -        this.applicationFactory = applicationFactory;
>>>>> +    private static FIXSessionFactory INSTANCE = new
>>>>> FIXSessionFactory();
>>>>> +
>>>>> +    public static FIXSessionFactory getInstance(FIXApplicationFactory
>>>>> af) {
>>>>> +        if (applicationFactory == null) {
>>>>> +            applicationFactory = af;
>>>>> +        }
>>>>> +        return INSTANCE;
>>>>> +    }
>>>>> +
>>>>> +    private FIXSessionFactory() {
>>>>>         this.log = LogFactory.getLog(this.getClass());
>>>>>         this.acceptorStore = new HashMap<String,Acceptor>();
>>>>>         this.initiatorStore = new HashMap<String, Initiator>();
>>>>>         this.applicationStore = new HashMap<String, Application>();
>>>>> +        this.listenerThreadPool = null;
>>>>> +        this.senderThreadPool = null;
>>>>>     }
>>>>>
>>>>>     /**
>>>>> @@ -101,7 +115,7 @@
>>>>>                 MessageFactory messageFactory = new
>>>>> DefaultMessageFactory();
>>>>>                 quickfix.LogFactory logFactory = getLogFactory(service,
>>>>> settings, true);
>>>>>                 //Get a new FIX Application
>>>>> -                Application messageHandler =
>>>>> applicationFactory.getFIXApplication(service, true);
>>>>> +                Application messageHandler =
>>>>> applicationFactory.getFIXApplication(service, listenerThreadPool, true);
>>>>>                 //Create a new FIX Acceptor
>>>>>                 Acceptor acceptor = new SocketAcceptor(
>>>>>                         messageHandler,
>>>>> @@ -174,7 +188,7 @@
>>>>>         MessageStoreFactory storeFactory =
>>>>> getMessageStoreFactory(service, settings, false);
>>>>>         MessageFactory messageFactory = new DefaultMessageFactory();
>>>>>         //Get a new FIX application
>>>>> -        Application messageHandler =
>>>>> applicationFactory.getFIXApplication(service, false);
>>>>> +        Application messageHandler =
>>>>> applicationFactory.getFIXApplication(service, senderThreadPool, false);
>>>>>
>>>>>         try {
>>>>>            //Create a new FIX initiator
>>>>> @@ -216,7 +230,7 @@
>>>>>                 MessageFactory messageFactory = new
>>>>> DefaultMessageFactory();
>>>>>                 quickfix.LogFactory logFactory = getLogFactory(service,
>>>>> settings, true);
>>>>>                 //Get a new FIX Application
>>>>> -                Application messageHandler =
>>>>> applicationFactory.getFIXApplication(service, false);
>>>>> +                Application messageHandler =
>>>>> applicationFactory.getFIXApplication(service, senderThreadPool, false);
>>>>>
>>>>>                 Initiator initiator = new SocketInitiator(
>>>>>                     messageHandler,
>>>>> @@ -246,10 +260,10 @@
>>>>>             }
>>>>>
>>>>>         } else {
>>>>> -            String msg = "The " +
>>>>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " +
>>>>> -                    "not specified. Unable to initialize the initiator
>>>>> session at this stage.";
>>>>> -            log.info(msg);
>>>>> -            throw new AxisFault(msg);
>>>>> +            // FIX initiator session is not configured
>>>>> +            // It could be intentional - So not an error (we don't
>>>>> need initiators at all times)
>>>>> +            log.info("The " +
>>>>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " +
>>>>> +                    "not specified. Unable to initialize the initiator
>>>>> session at this stage.");
>>>>>         }
>>>>>     }
>>>>>
>>>>> @@ -276,6 +290,24 @@
>>>>>     }
>>>>>
>>>>>     /**
>>>>> +     * Stops all the FIX initiators created so far and cleans up all
>>>>> the mappings
>>>>> +     * related to them
>>>>> +     */
>>>>> +    public void disposeFIXInitiators() {
>>>>> +        boolean debugEnabled = log.isDebugEnabled();
>>>>> +
>>>>> +        for (String key : initiatorStore.keySet()) {
>>>>> +            initiatorStore.get(key).stop();
>>>>> +            if (debugEnabled) {
>>>>> +                log.debug("FIX initiator to the EPR " + key + "
>>>>> stopped");
>>>>> +            }
>>>>> +        }
>>>>> +
>>>>> +        initiatorStore.clear();
>>>>> +        applicationStore.clear();
>>>>> +    }
>>>>> +
>>>>> +    /**
>>>>>      * Returns an array of Strings representing EPRs for the specified
>>>>> service
>>>>>      *
>>>>>      * @param serviceName the name of the service
>>>>> @@ -444,6 +476,14 @@
>>>>>         }
>>>>>         return app;
>>>>>     }
>>>>> +
>>>>> +    public void setListenerThreadPool(WorkerPool listenerThreadPool) {
>>>>> +        this.listenerThreadPool = listenerThreadPool;
>>>>> +    }
>>>>> +
>>>>> +    public void setSenderThreadPool(WorkerPool senderThreadPool) {
>>>>> +        this.senderThreadPool = senderThreadPool;
>>>>> +    }
>>>>>  }
>>>>>
>>>>>
>>>>>
>>>>> Modified:
>>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>>>>> URL:
>>>>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java?rev=773818&r1=773817&r2=773818&view=diff
>>>>>
>>>>> ==============================================================================
>>>>> ---
>>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>>>>> (original)
>>>>> +++
>>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>>>>> Tue May 12 08:00:27 2009
>>>>> @@ -60,14 +60,8 @@
>>>>>                      TransportInDescription trpInDesc) throws AxisFault
>>>>> {
>>>>>
>>>>>         super.init(cfgCtx, trpInDesc);
>>>>> -        //initialize the FIXSessionFactory
>>>>> -        fixSessionFactory = new FIXSessionFactory(
>>>>> -                new FIXApplicationFactory(this.cfgCtx,
>>>>> this.workerPool));
>>>>> -        FIXTransportSender sender = (FIXTransportSender) cfgCtx.
>>>>> -
>>>>>  getAxisConfiguration().getTransportOut(FIXConstants.TRANSPORT_NAME).getSender();
>>>>> -        if (sender != null) {
>>>>> -            sender.setSessionFactory(fixSessionFactory);
>>>>> -        }
>>>>> +        fixSessionFactory = FIXSessionFactory.getInstance(new
>>>>> FIXApplicationFactory(cfgCtx));
>>>>> +        fixSessionFactory.setListenerThreadPool(this.workerPool);
>>>>>         log.info("FIX transport listener initialized...");
>>>>>     }
>>>>>
>>>>>
>>>>> Modified:
>>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>>>>> URL:
>>>>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java?rev=773818&r1=773817&r2=773818&view=diff
>>>>>
>>>>> ==============================================================================
>>>>> ---
>>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>>>>> (original)
>>>>> +++
>>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>>>>> Tue May 12 08:00:27 2009
>>>>> @@ -28,6 +28,8 @@
>>>>>  import org.apache.axis2.transport.OutTransportInfo;
>>>>>  import org.apache.axis2.transport.base.AbstractTransportSender;
>>>>>  import org.apache.axis2.transport.base.BaseUtils;
>>>>> +import org.apache.axis2.transport.base.threads.WorkerPool;
>>>>> +import org.apache.axis2.transport.base.threads.WorkerPoolFactory;
>>>>>  import org.apache.commons.logging.LogFactory;
>>>>>  import quickfix.*;
>>>>>  import quickfix.field.*;
>>>>> @@ -51,17 +53,12 @@
>>>>>
>>>>>     private FIXSessionFactory sessionFactory;
>>>>>     private FIXOutgoingMessageHandler messageSender;
>>>>> +    private WorkerPool workerPool;
>>>>>
>>>>>     public FIXTransportSender() {
>>>>>         this.log = LogFactory.getLog(this.getClass());
>>>>>     }
>>>>>
>>>>> -
>>>>> -    public void setSessionFactory(FIXSessionFactory sessionFactory) {
>>>>> -        this.sessionFactory = sessionFactory;
>>>>> -        this.messageSender.setSessionFactory(sessionFactory);
>>>>> -    }
>>>>> -
>>>>>     /**
>>>>>      * @param cfgCtx       the axis2 configuration context
>>>>>      * @param transportOut the Out Transport description
>>>>> @@ -69,10 +66,25 @@
>>>>>      */
>>>>>     public void init(ConfigurationContext cfgCtx,
>>>>> TransportOutDescription transportOut) throws AxisFault {
>>>>>         super.init(cfgCtx, transportOut);
>>>>> +        this.sessionFactory = FIXSessionFactory.getInstance(new
>>>>> FIXApplicationFactory(cfgCtx));
>>>>> +        this.workerPool = WorkerPoolFactory.getWorkerPool(
>>>>> +                            10, 20, 5, -1, "FIX Sender Worker thread
>>>>> group", "FIX-Worker");
>>>>> +        this.sessionFactory.setSenderThreadPool(this.workerPool);
>>>>>         messageSender = new FIXOutgoingMessageHandler();
>>>>> +        messageSender.setSessionFactory(this.sessionFactory);
>>>>>         log.info("FIX transport sender initialized...");
>>>>>     }
>>>>>
>>>>> +    public void stop() {
>>>>> +        try {
>>>>> +            this.workerPool.shutdown(10000);
>>>>> +        } catch (InterruptedException e) {
>>>>> +            log.warn("Thread interrupted while waiting for worker pool
>>>>> to shut down");
>>>>> +        }
>>>>> +        sessionFactory.disposeFIXInitiators();
>>>>> +        super.stop();
>>>>> +    }
>>>>> +
>>>>>     /**
>>>>>      * Performs the actual sending of the message.
>>>>>      *
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Ruwan Linton
>>>> Senior Software Engineer & Product Manager; WSO2 ESB;
>>>> http://wso2.org/esb
>>>> WSO2 Inc.; http://wso2.org
>>>> email: ruwan@wso2.com; cell: +94 77 341 3097
>>>> blog: http://ruwansblog.blogspot.com
>>>
>>>
>>>
>>> --
>>> Hiranya Jayathilaka
>>> Software Engineer;
>>> WSO2 Inc.;  http://wso2.org
>>> E-mail: hiranya@wso2.com;  Mobile: +94 77 633 3491
>>> Blog: http://techfeast-hiranya.blogspot.com
>>
>>
>>
>> --
>> Ruwan Linton
>> Senior Software Engineer & Product Manager; WSO2 ESB; http://wso2.org/esb
>> WSO2 Inc.; http://wso2.org
>> email: ruwan@wso2.com; cell: +94 77 341 3097
>> blog: http://ruwansblog.blogspot.com
>
>
>
> --
> Hiranya Jayathilaka
> Software Engineer;
> WSO2 Inc.;  http://wso2.org
> E-mail: hiranya@wso2.com;  Mobile: +94 77 633 3491
> Blog: http://techfeast-hiranya.blogspot.com
>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@synapse.apache.org
For additional commands, e-mail: dev-help@synapse.apache.org


Re: svn commit: r773818 - in /synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix: FIXApplicationFactory.java FIXSessionFactory.java FIXTransportListener.java FIXTransportSender.java

Posted by Hiranya Jayathilaka <hi...@gmail.com>.
On Wed, May 13, 2009 at 10:30 AM, Ruwan Linton <ru...@gmail.com>wrote:

> I think we need to make that configurable as well.... currently hard codded
> setting will work in 98% of the cases, but there can be a scenario where it
> requires a tune up.
>
> Can we do this in a manner that we can configure them per transport.


One simple solution would be to read a transport specific configuration file
at AbstractTransportListener#init(). The init method gets a
TransportInDescription object as an argument and from that we can retrieve
the transport name to construct a file name unique to a given transport (eg:
mail.properties, vfs.properties). This approach has the benefit that it
doesn't require changes to any of the actual transport implementations.
Everything is taken care of by the abstract class.

However this class now belongs to the WS-Commons transports project. So the
enhancement should be made there.

Thanks,
Hiranya



>
>
> Thanks,
> Ruwan
>
>
> On Wed, May 13, 2009 at 10:06 AM, Hiranya Jayathilaka <
> hiranya911@gmail.com> wrote:
>
>> Hi Ruwan,
>>
>> On Wed, May 13, 2009 at 12:43 AM, Ruwan Linton <ru...@gmail.com>wrote:
>>
>>> Hiranya,
>>>
>>> If you can make the worker pool configurable that would be of much
>>> importance... you may have a look at the nhttp transport thread pool, which
>>> can be configurable via the nhttp.properties file.
>>
>>
>> Currently the FIX sender initializes the WorkerPool in a manner similar to
>> the AbstractTransportListener. The WorkerPool in AbstractTransportListener
>> is used by several transports (JMS, Mail etc) via inheritance. FIX listener
>> also makes use of the same thread pool. Do we have any plans to make that
>> thread pool configurable too? Othrewise I don't think it mkes much sense
>> just to make the FIX sender's thread pool configurable.
>>
>> Thanks,
>> Hiranya
>>
>>
>>>
>>> Thanks,
>>> Ruwan
>>>
>>> On Tue, May 12, 2009 at 1:30 PM, <hi...@apache.org> wrote:
>>>
>>>> Author: hiranya
>>>> Date: Tue May 12 08:00:27 2009
>>>> New Revision: 773818
>>>>
>>>> URL: http://svn.apache.org/viewvc?rev=773818&view=rev
>>>> Log:
>>>> Enhancements and code cleanup in the FIX transport:
>>>> * FIX sender now has its own worker pool and hence does not rely on the
>>>> FIX listener any more. Therefore listener and sender can be enabled
>>>> individually
>>>> * Made FIXSessionFactory a singleton to effectively share session data
>>>> among the listener and the sender
>>>> * Cleanup logic for initiators during sender shutdown
>>>> * Minor bug fix at FIXSessionFactory for a bug which prevented the
>>>> sample 259 and similar scenarios from operating properly
>>>>
>>>> Modified:
>>>>
>>>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>>>>
>>>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>>>>
>>>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>>>>
>>>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>>>>
>>>> Modified:
>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>>>> URL:
>>>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java?rev=773818&r1=773817&r2=773818&view=diff
>>>>
>>>> ==============================================================================
>>>> ---
>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>>>> (original)
>>>> +++
>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>>>> Tue May 12 08:00:27 2009
>>>> @@ -27,15 +27,12 @@
>>>>  public class FIXApplicationFactory {
>>>>
>>>>     private ConfigurationContext cfgCtx;
>>>> -    private WorkerPool workerPool;
>>>> -
>>>> -    public FIXApplicationFactory(ConfigurationContext cfgCtx,
>>>> WorkerPool workerPool) {
>>>>
>>>> +    public FIXApplicationFactory(ConfigurationContext cfgCtx) {
>>>>         this.cfgCtx = cfgCtx;
>>>> -        this.workerPool = workerPool;
>>>>     }
>>>>
>>>> -    public Application getFIXApplication(AxisService service, boolean
>>>> acceptor) {
>>>> +    public Application getFIXApplication(AxisService service,
>>>> WorkerPool workerPool, boolean acceptor) {
>>>>         return new FIXIncomingMessageHandler(cfgCtx, workerPool,
>>>> service, acceptor);
>>>>     }
>>>>  }
>>>> \ No newline at end of file
>>>>
>>>> Modified:
>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>>>> URL:
>>>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java?rev=773818&r1=773817&r2=773818&view=diff
>>>>
>>>> ==============================================================================
>>>> ---
>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>>>> (original)
>>>> +++
>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>>>> Tue May 12 08:00:27 2009
>>>> @@ -23,6 +23,7 @@
>>>>  import org.apache.axis2.description.AxisService;
>>>>  import org.apache.axis2.description.Parameter;
>>>>  import org.apache.axis2.transport.base.BaseUtils;
>>>> +import org.apache.axis2.transport.base.threads.WorkerPool;
>>>>  import org.apache.commons.logging.Log;
>>>>  import org.apache.commons.logging.LogFactory;
>>>>  import quickfix.*;
>>>> @@ -65,16 +66,29 @@
>>>>     /** A Map containing all the FIX applications created for
>>>> initiators, keyed by FIX EPR */
>>>>     private Map<String, Application> applicationStore;
>>>>     /** An ApplicationFactory handles creating FIX Applications
>>>> (FIXIncomingMessageHandler Objects) */
>>>> -    private FIXApplicationFactory applicationFactory;
>>>> +    private static FIXApplicationFactory applicationFactory = null;
>>>> +
>>>> +    private WorkerPool listenerThreadPool;
>>>> +    private WorkerPool senderThreadPool;
>>>>
>>>>     private Log log;
>>>>
>>>> -    public FIXSessionFactory(FIXApplicationFactory applicationFactory)
>>>> {
>>>> -        this.applicationFactory = applicationFactory;
>>>> +    private static FIXSessionFactory INSTANCE = new
>>>> FIXSessionFactory();
>>>> +
>>>> +    public static FIXSessionFactory getInstance(FIXApplicationFactory
>>>> af) {
>>>> +        if (applicationFactory == null) {
>>>> +            applicationFactory = af;
>>>> +        }
>>>> +        return INSTANCE;
>>>> +    }
>>>> +
>>>> +    private FIXSessionFactory() {
>>>>         this.log = LogFactory.getLog(this.getClass());
>>>>         this.acceptorStore = new HashMap<String,Acceptor>();
>>>>         this.initiatorStore = new HashMap<String, Initiator>();
>>>>         this.applicationStore = new HashMap<String, Application>();
>>>> +        this.listenerThreadPool = null;
>>>> +        this.senderThreadPool = null;
>>>>     }
>>>>
>>>>     /**
>>>> @@ -101,7 +115,7 @@
>>>>                 MessageFactory messageFactory = new
>>>> DefaultMessageFactory();
>>>>                 quickfix.LogFactory logFactory = getLogFactory(service,
>>>> settings, true);
>>>>                 //Get a new FIX Application
>>>> -                Application messageHandler =
>>>> applicationFactory.getFIXApplication(service, true);
>>>> +                Application messageHandler =
>>>> applicationFactory.getFIXApplication(service, listenerThreadPool, true);
>>>>                 //Create a new FIX Acceptor
>>>>                 Acceptor acceptor = new SocketAcceptor(
>>>>                         messageHandler,
>>>> @@ -174,7 +188,7 @@
>>>>         MessageStoreFactory storeFactory =
>>>> getMessageStoreFactory(service, settings, false);
>>>>         MessageFactory messageFactory = new DefaultMessageFactory();
>>>>         //Get a new FIX application
>>>> -        Application messageHandler =
>>>> applicationFactory.getFIXApplication(service, false);
>>>> +        Application messageHandler =
>>>> applicationFactory.getFIXApplication(service, senderThreadPool, false);
>>>>
>>>>         try {
>>>>            //Create a new FIX initiator
>>>> @@ -216,7 +230,7 @@
>>>>                 MessageFactory messageFactory = new
>>>> DefaultMessageFactory();
>>>>                 quickfix.LogFactory logFactory = getLogFactory(service,
>>>> settings, true);
>>>>                 //Get a new FIX Application
>>>> -                Application messageHandler =
>>>> applicationFactory.getFIXApplication(service, false);
>>>> +                Application messageHandler =
>>>> applicationFactory.getFIXApplication(service, senderThreadPool, false);
>>>>
>>>>                 Initiator initiator = new SocketInitiator(
>>>>                     messageHandler,
>>>> @@ -246,10 +260,10 @@
>>>>             }
>>>>
>>>>         } else {
>>>> -            String msg = "The " +
>>>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " +
>>>> -                    "not specified. Unable to initialize the initiator
>>>> session at this stage.";
>>>> -            log.info(msg);
>>>> -            throw new AxisFault(msg);
>>>> +            // FIX initiator session is not configured
>>>> +            // It could be intentional - So not an error (we don't need
>>>> initiators at all times)
>>>> +            log.info("The " +
>>>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " +
>>>> +                    "not specified. Unable to initialize the initiator
>>>> session at this stage.");
>>>>         }
>>>>     }
>>>>
>>>> @@ -276,6 +290,24 @@
>>>>     }
>>>>
>>>>     /**
>>>> +     * Stops all the FIX initiators created so far and cleans up all
>>>> the mappings
>>>> +     * related to them
>>>> +     */
>>>> +    public void disposeFIXInitiators() {
>>>> +        boolean debugEnabled = log.isDebugEnabled();
>>>> +
>>>> +        for (String key : initiatorStore.keySet()) {
>>>> +            initiatorStore.get(key).stop();
>>>> +            if (debugEnabled) {
>>>> +                log.debug("FIX initiator to the EPR " + key + "
>>>> stopped");
>>>> +            }
>>>> +        }
>>>> +
>>>> +        initiatorStore.clear();
>>>> +        applicationStore.clear();
>>>> +    }
>>>> +
>>>> +    /**
>>>>      * Returns an array of Strings representing EPRs for the specified
>>>> service
>>>>      *
>>>>      * @param serviceName the name of the service
>>>> @@ -444,6 +476,14 @@
>>>>         }
>>>>         return app;
>>>>     }
>>>> +
>>>> +    public void setListenerThreadPool(WorkerPool listenerThreadPool) {
>>>> +        this.listenerThreadPool = listenerThreadPool;
>>>> +    }
>>>> +
>>>> +    public void setSenderThreadPool(WorkerPool senderThreadPool) {
>>>> +        this.senderThreadPool = senderThreadPool;
>>>> +    }
>>>>  }
>>>>
>>>>
>>>>
>>>> Modified:
>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>>>> URL:
>>>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java?rev=773818&r1=773817&r2=773818&view=diff
>>>>
>>>> ==============================================================================
>>>> ---
>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>>>> (original)
>>>> +++
>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>>>> Tue May 12 08:00:27 2009
>>>> @@ -60,14 +60,8 @@
>>>>                      TransportInDescription trpInDesc) throws AxisFault
>>>> {
>>>>
>>>>         super.init(cfgCtx, trpInDesc);
>>>> -        //initialize the FIXSessionFactory
>>>> -        fixSessionFactory = new FIXSessionFactory(
>>>> -                new FIXApplicationFactory(this.cfgCtx,
>>>> this.workerPool));
>>>> -        FIXTransportSender sender = (FIXTransportSender) cfgCtx.
>>>> -
>>>>  getAxisConfiguration().getTransportOut(FIXConstants.TRANSPORT_NAME).getSender();
>>>> -        if (sender != null) {
>>>> -            sender.setSessionFactory(fixSessionFactory);
>>>> -        }
>>>> +        fixSessionFactory = FIXSessionFactory.getInstance(new
>>>> FIXApplicationFactory(cfgCtx));
>>>> +        fixSessionFactory.setListenerThreadPool(this.workerPool);
>>>>         log.info("FIX transport listener initialized...");
>>>>     }
>>>>
>>>>
>>>> Modified:
>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>>>> URL:
>>>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java?rev=773818&r1=773817&r2=773818&view=diff
>>>>
>>>> ==============================================================================
>>>> ---
>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>>>> (original)
>>>> +++
>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>>>> Tue May 12 08:00:27 2009
>>>> @@ -28,6 +28,8 @@
>>>>  import org.apache.axis2.transport.OutTransportInfo;
>>>>  import org.apache.axis2.transport.base.AbstractTransportSender;
>>>>  import org.apache.axis2.transport.base.BaseUtils;
>>>> +import org.apache.axis2.transport.base.threads.WorkerPool;
>>>> +import org.apache.axis2.transport.base.threads.WorkerPoolFactory;
>>>>  import org.apache.commons.logging.LogFactory;
>>>>  import quickfix.*;
>>>>  import quickfix.field.*;
>>>> @@ -51,17 +53,12 @@
>>>>
>>>>     private FIXSessionFactory sessionFactory;
>>>>     private FIXOutgoingMessageHandler messageSender;
>>>> +    private WorkerPool workerPool;
>>>>
>>>>     public FIXTransportSender() {
>>>>         this.log = LogFactory.getLog(this.getClass());
>>>>     }
>>>>
>>>> -
>>>> -    public void setSessionFactory(FIXSessionFactory sessionFactory) {
>>>> -        this.sessionFactory = sessionFactory;
>>>> -        this.messageSender.setSessionFactory(sessionFactory);
>>>> -    }
>>>> -
>>>>     /**
>>>>      * @param cfgCtx       the axis2 configuration context
>>>>      * @param transportOut the Out Transport description
>>>> @@ -69,10 +66,25 @@
>>>>      */
>>>>     public void init(ConfigurationContext cfgCtx,
>>>> TransportOutDescription transportOut) throws AxisFault {
>>>>         super.init(cfgCtx, transportOut);
>>>> +        this.sessionFactory = FIXSessionFactory.getInstance(new
>>>> FIXApplicationFactory(cfgCtx));
>>>> +        this.workerPool = WorkerPoolFactory.getWorkerPool(
>>>> +                            10, 20, 5, -1, "FIX Sender Worker thread
>>>> group", "FIX-Worker");
>>>> +        this.sessionFactory.setSenderThreadPool(this.workerPool);
>>>>         messageSender = new FIXOutgoingMessageHandler();
>>>> +        messageSender.setSessionFactory(this.sessionFactory);
>>>>         log.info("FIX transport sender initialized...");
>>>>     }
>>>>
>>>> +    public void stop() {
>>>> +        try {
>>>> +            this.workerPool.shutdown(10000);
>>>> +        } catch (InterruptedException e) {
>>>> +            log.warn("Thread interrupted while waiting for worker pool
>>>> to shut down");
>>>> +        }
>>>> +        sessionFactory.disposeFIXInitiators();
>>>> +        super.stop();
>>>> +    }
>>>> +
>>>>     /**
>>>>      * Performs the actual sending of the message.
>>>>      *
>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Ruwan Linton
>>> Senior Software Engineer & Product Manager; WSO2 ESB;
>>> http://wso2.org/esb
>>> WSO2 Inc.; http://wso2.org
>>> email: ruwan@wso2.com; cell: +94 77 341 3097
>>> blog: http://ruwansblog.blogspot.com
>>>
>>
>>
>>
>> --
>> Hiranya Jayathilaka
>> Software Engineer;
>> WSO2 Inc.;  http://wso2.org
>> E-mail: hiranya@wso2.com;  Mobile: +94 77 633 3491
>> Blog: http://techfeast-hiranya.blogspot.com
>>
>
>
>
> --
> Ruwan Linton
> Senior Software Engineer & Product Manager; WSO2 ESB; http://wso2.org/esb
> WSO2 Inc.; http://wso2.org
> email: ruwan@wso2.com; cell: +94 77 341 3097
> blog: http://ruwansblog.blogspot.com
>



-- 
Hiranya Jayathilaka
Software Engineer;
WSO2 Inc.;  http://wso2.org
E-mail: hiranya@wso2.com;  Mobile: +94 77 633 3491
Blog: http://techfeast-hiranya.blogspot.com

Re: svn commit: r773818 - in /synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix: FIXApplicationFactory.java FIXSessionFactory.java FIXTransportListener.java FIXTransportSender.java

Posted by Ruwan Linton <ru...@gmail.com>.
I think we need to make that configurable as well.... currently hard codded
setting will work in 98% of the cases, but there can be a scenario where it
requires a tune up.

Can we do this in a manner that we can configure them per transport.

Thanks,
Ruwan

On Wed, May 13, 2009 at 10:06 AM, Hiranya Jayathilaka
<hi...@gmail.com>wrote:

> Hi Ruwan,
>
> On Wed, May 13, 2009 at 12:43 AM, Ruwan Linton <ru...@gmail.com>wrote:
>
>> Hiranya,
>>
>> If you can make the worker pool configurable that would be of much
>> importance... you may have a look at the nhttp transport thread pool, which
>> can be configurable via the nhttp.properties file.
>
>
> Currently the FIX sender initializes the WorkerPool in a manner similar to
> the AbstractTransportListener. The WorkerPool in AbstractTransportListener
> is used by several transports (JMS, Mail etc) via inheritance. FIX listener
> also makes use of the same thread pool. Do we have any plans to make that
> thread pool configurable too? Othrewise I don't think it mkes much sense
> just to make the FIX sender's thread pool configurable.
>
> Thanks,
> Hiranya
>
>
>>
>> Thanks,
>> Ruwan
>>
>> On Tue, May 12, 2009 at 1:30 PM, <hi...@apache.org> wrote:
>>
>>> Author: hiranya
>>> Date: Tue May 12 08:00:27 2009
>>> New Revision: 773818
>>>
>>> URL: http://svn.apache.org/viewvc?rev=773818&view=rev
>>> Log:
>>> Enhancements and code cleanup in the FIX transport:
>>> * FIX sender now has its own worker pool and hence does not rely on the
>>> FIX listener any more. Therefore listener and sender can be enabled
>>> individually
>>> * Made FIXSessionFactory a singleton to effectively share session data
>>> among the listener and the sender
>>> * Cleanup logic for initiators during sender shutdown
>>> * Minor bug fix at FIXSessionFactory for a bug which prevented the sample
>>> 259 and similar scenarios from operating properly
>>>
>>> Modified:
>>>
>>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>>>
>>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>>>
>>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>>>
>>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>>>
>>> Modified:
>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>>> URL:
>>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java?rev=773818&r1=773817&r2=773818&view=diff
>>>
>>> ==============================================================================
>>> ---
>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>>> (original)
>>> +++
>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>>> Tue May 12 08:00:27 2009
>>> @@ -27,15 +27,12 @@
>>>  public class FIXApplicationFactory {
>>>
>>>     private ConfigurationContext cfgCtx;
>>> -    private WorkerPool workerPool;
>>> -
>>> -    public FIXApplicationFactory(ConfigurationContext cfgCtx, WorkerPool
>>> workerPool) {
>>>
>>> +    public FIXApplicationFactory(ConfigurationContext cfgCtx) {
>>>         this.cfgCtx = cfgCtx;
>>> -        this.workerPool = workerPool;
>>>     }
>>>
>>> -    public Application getFIXApplication(AxisService service, boolean
>>> acceptor) {
>>> +    public Application getFIXApplication(AxisService service, WorkerPool
>>> workerPool, boolean acceptor) {
>>>         return new FIXIncomingMessageHandler(cfgCtx, workerPool, service,
>>> acceptor);
>>>     }
>>>  }
>>> \ No newline at end of file
>>>
>>> Modified:
>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>>> URL:
>>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java?rev=773818&r1=773817&r2=773818&view=diff
>>>
>>> ==============================================================================
>>> ---
>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>>> (original)
>>> +++
>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>>> Tue May 12 08:00:27 2009
>>> @@ -23,6 +23,7 @@
>>>  import org.apache.axis2.description.AxisService;
>>>  import org.apache.axis2.description.Parameter;
>>>  import org.apache.axis2.transport.base.BaseUtils;
>>> +import org.apache.axis2.transport.base.threads.WorkerPool;
>>>  import org.apache.commons.logging.Log;
>>>  import org.apache.commons.logging.LogFactory;
>>>  import quickfix.*;
>>> @@ -65,16 +66,29 @@
>>>     /** A Map containing all the FIX applications created for initiators,
>>> keyed by FIX EPR */
>>>     private Map<String, Application> applicationStore;
>>>     /** An ApplicationFactory handles creating FIX Applications
>>> (FIXIncomingMessageHandler Objects) */
>>> -    private FIXApplicationFactory applicationFactory;
>>> +    private static FIXApplicationFactory applicationFactory = null;
>>> +
>>> +    private WorkerPool listenerThreadPool;
>>> +    private WorkerPool senderThreadPool;
>>>
>>>     private Log log;
>>>
>>> -    public FIXSessionFactory(FIXApplicationFactory applicationFactory) {
>>> -        this.applicationFactory = applicationFactory;
>>> +    private static FIXSessionFactory INSTANCE = new FIXSessionFactory();
>>> +
>>> +    public static FIXSessionFactory getInstance(FIXApplicationFactory
>>> af) {
>>> +        if (applicationFactory == null) {
>>> +            applicationFactory = af;
>>> +        }
>>> +        return INSTANCE;
>>> +    }
>>> +
>>> +    private FIXSessionFactory() {
>>>         this.log = LogFactory.getLog(this.getClass());
>>>         this.acceptorStore = new HashMap<String,Acceptor>();
>>>         this.initiatorStore = new HashMap<String, Initiator>();
>>>         this.applicationStore = new HashMap<String, Application>();
>>> +        this.listenerThreadPool = null;
>>> +        this.senderThreadPool = null;
>>>     }
>>>
>>>     /**
>>> @@ -101,7 +115,7 @@
>>>                 MessageFactory messageFactory = new
>>> DefaultMessageFactory();
>>>                 quickfix.LogFactory logFactory = getLogFactory(service,
>>> settings, true);
>>>                 //Get a new FIX Application
>>> -                Application messageHandler =
>>> applicationFactory.getFIXApplication(service, true);
>>> +                Application messageHandler =
>>> applicationFactory.getFIXApplication(service, listenerThreadPool, true);
>>>                 //Create a new FIX Acceptor
>>>                 Acceptor acceptor = new SocketAcceptor(
>>>                         messageHandler,
>>> @@ -174,7 +188,7 @@
>>>         MessageStoreFactory storeFactory =
>>> getMessageStoreFactory(service, settings, false);
>>>         MessageFactory messageFactory = new DefaultMessageFactory();
>>>         //Get a new FIX application
>>> -        Application messageHandler =
>>> applicationFactory.getFIXApplication(service, false);
>>> +        Application messageHandler =
>>> applicationFactory.getFIXApplication(service, senderThreadPool, false);
>>>
>>>         try {
>>>            //Create a new FIX initiator
>>> @@ -216,7 +230,7 @@
>>>                 MessageFactory messageFactory = new
>>> DefaultMessageFactory();
>>>                 quickfix.LogFactory logFactory = getLogFactory(service,
>>> settings, true);
>>>                 //Get a new FIX Application
>>> -                Application messageHandler =
>>> applicationFactory.getFIXApplication(service, false);
>>> +                Application messageHandler =
>>> applicationFactory.getFIXApplication(service, senderThreadPool, false);
>>>
>>>                 Initiator initiator = new SocketInitiator(
>>>                     messageHandler,
>>> @@ -246,10 +260,10 @@
>>>             }
>>>
>>>         } else {
>>> -            String msg = "The " +
>>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " +
>>> -                    "not specified. Unable to initialize the initiator
>>> session at this stage.";
>>> -            log.info(msg);
>>> -            throw new AxisFault(msg);
>>> +            // FIX initiator session is not configured
>>> +            // It could be intentional - So not an error (we don't need
>>> initiators at all times)
>>> +            log.info("The " +
>>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " +
>>> +                    "not specified. Unable to initialize the initiator
>>> session at this stage.");
>>>         }
>>>     }
>>>
>>> @@ -276,6 +290,24 @@
>>>     }
>>>
>>>     /**
>>> +     * Stops all the FIX initiators created so far and cleans up all the
>>> mappings
>>> +     * related to them
>>> +     */
>>> +    public void disposeFIXInitiators() {
>>> +        boolean debugEnabled = log.isDebugEnabled();
>>> +
>>> +        for (String key : initiatorStore.keySet()) {
>>> +            initiatorStore.get(key).stop();
>>> +            if (debugEnabled) {
>>> +                log.debug("FIX initiator to the EPR " + key + "
>>> stopped");
>>> +            }
>>> +        }
>>> +
>>> +        initiatorStore.clear();
>>> +        applicationStore.clear();
>>> +    }
>>> +
>>> +    /**
>>>      * Returns an array of Strings representing EPRs for the specified
>>> service
>>>      *
>>>      * @param serviceName the name of the service
>>> @@ -444,6 +476,14 @@
>>>         }
>>>         return app;
>>>     }
>>> +
>>> +    public void setListenerThreadPool(WorkerPool listenerThreadPool) {
>>> +        this.listenerThreadPool = listenerThreadPool;
>>> +    }
>>> +
>>> +    public void setSenderThreadPool(WorkerPool senderThreadPool) {
>>> +        this.senderThreadPool = senderThreadPool;
>>> +    }
>>>  }
>>>
>>>
>>>
>>> Modified:
>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>>> URL:
>>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java?rev=773818&r1=773817&r2=773818&view=diff
>>>
>>> ==============================================================================
>>> ---
>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>>> (original)
>>> +++
>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>>> Tue May 12 08:00:27 2009
>>> @@ -60,14 +60,8 @@
>>>                      TransportInDescription trpInDesc) throws AxisFault {
>>>
>>>         super.init(cfgCtx, trpInDesc);
>>> -        //initialize the FIXSessionFactory
>>> -        fixSessionFactory = new FIXSessionFactory(
>>> -                new FIXApplicationFactory(this.cfgCtx,
>>> this.workerPool));
>>> -        FIXTransportSender sender = (FIXTransportSender) cfgCtx.
>>> -
>>>  getAxisConfiguration().getTransportOut(FIXConstants.TRANSPORT_NAME).getSender();
>>> -        if (sender != null) {
>>> -            sender.setSessionFactory(fixSessionFactory);
>>> -        }
>>> +        fixSessionFactory = FIXSessionFactory.getInstance(new
>>> FIXApplicationFactory(cfgCtx));
>>> +        fixSessionFactory.setListenerThreadPool(this.workerPool);
>>>         log.info("FIX transport listener initialized...");
>>>     }
>>>
>>>
>>> Modified:
>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>>> URL:
>>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java?rev=773818&r1=773817&r2=773818&view=diff
>>>
>>> ==============================================================================
>>> ---
>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>>> (original)
>>> +++
>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>>> Tue May 12 08:00:27 2009
>>> @@ -28,6 +28,8 @@
>>>  import org.apache.axis2.transport.OutTransportInfo;
>>>  import org.apache.axis2.transport.base.AbstractTransportSender;
>>>  import org.apache.axis2.transport.base.BaseUtils;
>>> +import org.apache.axis2.transport.base.threads.WorkerPool;
>>> +import org.apache.axis2.transport.base.threads.WorkerPoolFactory;
>>>  import org.apache.commons.logging.LogFactory;
>>>  import quickfix.*;
>>>  import quickfix.field.*;
>>> @@ -51,17 +53,12 @@
>>>
>>>     private FIXSessionFactory sessionFactory;
>>>     private FIXOutgoingMessageHandler messageSender;
>>> +    private WorkerPool workerPool;
>>>
>>>     public FIXTransportSender() {
>>>         this.log = LogFactory.getLog(this.getClass());
>>>     }
>>>
>>> -
>>> -    public void setSessionFactory(FIXSessionFactory sessionFactory) {
>>> -        this.sessionFactory = sessionFactory;
>>> -        this.messageSender.setSessionFactory(sessionFactory);
>>> -    }
>>> -
>>>     /**
>>>      * @param cfgCtx       the axis2 configuration context
>>>      * @param transportOut the Out Transport description
>>> @@ -69,10 +66,25 @@
>>>      */
>>>     public void init(ConfigurationContext cfgCtx, TransportOutDescription
>>> transportOut) throws AxisFault {
>>>         super.init(cfgCtx, transportOut);
>>> +        this.sessionFactory = FIXSessionFactory.getInstance(new
>>> FIXApplicationFactory(cfgCtx));
>>> +        this.workerPool = WorkerPoolFactory.getWorkerPool(
>>> +                            10, 20, 5, -1, "FIX Sender Worker thread
>>> group", "FIX-Worker");
>>> +        this.sessionFactory.setSenderThreadPool(this.workerPool);
>>>         messageSender = new FIXOutgoingMessageHandler();
>>> +        messageSender.setSessionFactory(this.sessionFactory);
>>>         log.info("FIX transport sender initialized...");
>>>     }
>>>
>>> +    public void stop() {
>>> +        try {
>>> +            this.workerPool.shutdown(10000);
>>> +        } catch (InterruptedException e) {
>>> +            log.warn("Thread interrupted while waiting for worker pool
>>> to shut down");
>>> +        }
>>> +        sessionFactory.disposeFIXInitiators();
>>> +        super.stop();
>>> +    }
>>> +
>>>     /**
>>>      * Performs the actual sending of the message.
>>>      *
>>>
>>>
>>>
>>
>>
>> --
>> Ruwan Linton
>> Senior Software Engineer & Product Manager; WSO2 ESB; http://wso2.org/esb
>> WSO2 Inc.; http://wso2.org
>> email: ruwan@wso2.com; cell: +94 77 341 3097
>> blog: http://ruwansblog.blogspot.com
>>
>
>
>
> --
> Hiranya Jayathilaka
> Software Engineer;
> WSO2 Inc.;  http://wso2.org
> E-mail: hiranya@wso2.com;  Mobile: +94 77 633 3491
> Blog: http://techfeast-hiranya.blogspot.com
>



-- 
Ruwan Linton
Senior Software Engineer & Product Manager; WSO2 ESB; http://wso2.org/esb
WSO2 Inc.; http://wso2.org
email: ruwan@wso2.com; cell: +94 77 341 3097
blog: http://ruwansblog.blogspot.com

Re: svn commit: r773818 - in /synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix: FIXApplicationFactory.java FIXSessionFactory.java FIXTransportListener.java FIXTransportSender.java

Posted by Hiranya Jayathilaka <hi...@gmail.com>.
Hi Ruwan,

On Wed, May 13, 2009 at 12:43 AM, Ruwan Linton <ru...@gmail.com>wrote:

> Hiranya,
>
> If you can make the worker pool configurable that would be of much
> importance... you may have a look at the nhttp transport thread pool, which
> can be configurable via the nhttp.properties file.


Currently the FIX sender initializes the WorkerPool in a manner similar to
the AbstractTransportListener. The WorkerPool in AbstractTransportListener
is used by several transports (JMS, Mail etc) via inheritance. FIX listener
also makes use of the same thread pool. Do we have any plans to make that
thread pool configurable too? Othrewise I don't think it mkes much sense
just to make the FIX sender's thread pool configurable.

Thanks,
Hiranya


>
> Thanks,
> Ruwan
>
> On Tue, May 12, 2009 at 1:30 PM, <hi...@apache.org> wrote:
>
>> Author: hiranya
>> Date: Tue May 12 08:00:27 2009
>> New Revision: 773818
>>
>> URL: http://svn.apache.org/viewvc?rev=773818&view=rev
>> Log:
>> Enhancements and code cleanup in the FIX transport:
>> * FIX sender now has its own worker pool and hence does not rely on the
>> FIX listener any more. Therefore listener and sender can be enabled
>> individually
>> * Made FIXSessionFactory a singleton to effectively share session data
>> among the listener and the sender
>> * Cleanup logic for initiators during sender shutdown
>> * Minor bug fix at FIXSessionFactory for a bug which prevented the sample
>> 259 and similar scenarios from operating properly
>>
>> Modified:
>>
>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>>
>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>>
>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>>
>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>>
>> Modified:
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>> URL:
>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java?rev=773818&r1=773817&r2=773818&view=diff
>>
>> ==============================================================================
>> ---
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>> (original)
>> +++
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>> Tue May 12 08:00:27 2009
>> @@ -27,15 +27,12 @@
>>  public class FIXApplicationFactory {
>>
>>     private ConfigurationContext cfgCtx;
>> -    private WorkerPool workerPool;
>> -
>> -    public FIXApplicationFactory(ConfigurationContext cfgCtx, WorkerPool
>> workerPool) {
>>
>> +    public FIXApplicationFactory(ConfigurationContext cfgCtx) {
>>         this.cfgCtx = cfgCtx;
>> -        this.workerPool = workerPool;
>>     }
>>
>> -    public Application getFIXApplication(AxisService service, boolean
>> acceptor) {
>> +    public Application getFIXApplication(AxisService service, WorkerPool
>> workerPool, boolean acceptor) {
>>         return new FIXIncomingMessageHandler(cfgCtx, workerPool, service,
>> acceptor);
>>     }
>>  }
>> \ No newline at end of file
>>
>> Modified:
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>> URL:
>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java?rev=773818&r1=773817&r2=773818&view=diff
>>
>> ==============================================================================
>> ---
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>> (original)
>> +++
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>> Tue May 12 08:00:27 2009
>> @@ -23,6 +23,7 @@
>>  import org.apache.axis2.description.AxisService;
>>  import org.apache.axis2.description.Parameter;
>>  import org.apache.axis2.transport.base.BaseUtils;
>> +import org.apache.axis2.transport.base.threads.WorkerPool;
>>  import org.apache.commons.logging.Log;
>>  import org.apache.commons.logging.LogFactory;
>>  import quickfix.*;
>> @@ -65,16 +66,29 @@
>>     /** A Map containing all the FIX applications created for initiators,
>> keyed by FIX EPR */
>>     private Map<String, Application> applicationStore;
>>     /** An ApplicationFactory handles creating FIX Applications
>> (FIXIncomingMessageHandler Objects) */
>> -    private FIXApplicationFactory applicationFactory;
>> +    private static FIXApplicationFactory applicationFactory = null;
>> +
>> +    private WorkerPool listenerThreadPool;
>> +    private WorkerPool senderThreadPool;
>>
>>     private Log log;
>>
>> -    public FIXSessionFactory(FIXApplicationFactory applicationFactory) {
>> -        this.applicationFactory = applicationFactory;
>> +    private static FIXSessionFactory INSTANCE = new FIXSessionFactory();
>> +
>> +    public static FIXSessionFactory getInstance(FIXApplicationFactory af)
>> {
>> +        if (applicationFactory == null) {
>> +            applicationFactory = af;
>> +        }
>> +        return INSTANCE;
>> +    }
>> +
>> +    private FIXSessionFactory() {
>>         this.log = LogFactory.getLog(this.getClass());
>>         this.acceptorStore = new HashMap<String,Acceptor>();
>>         this.initiatorStore = new HashMap<String, Initiator>();
>>         this.applicationStore = new HashMap<String, Application>();
>> +        this.listenerThreadPool = null;
>> +        this.senderThreadPool = null;
>>     }
>>
>>     /**
>> @@ -101,7 +115,7 @@
>>                 MessageFactory messageFactory = new
>> DefaultMessageFactory();
>>                 quickfix.LogFactory logFactory = getLogFactory(service,
>> settings, true);
>>                 //Get a new FIX Application
>> -                Application messageHandler =
>> applicationFactory.getFIXApplication(service, true);
>> +                Application messageHandler =
>> applicationFactory.getFIXApplication(service, listenerThreadPool, true);
>>                 //Create a new FIX Acceptor
>>                 Acceptor acceptor = new SocketAcceptor(
>>                         messageHandler,
>> @@ -174,7 +188,7 @@
>>         MessageStoreFactory storeFactory = getMessageStoreFactory(service,
>> settings, false);
>>         MessageFactory messageFactory = new DefaultMessageFactory();
>>         //Get a new FIX application
>> -        Application messageHandler =
>> applicationFactory.getFIXApplication(service, false);
>> +        Application messageHandler =
>> applicationFactory.getFIXApplication(service, senderThreadPool, false);
>>
>>         try {
>>            //Create a new FIX initiator
>> @@ -216,7 +230,7 @@
>>                 MessageFactory messageFactory = new
>> DefaultMessageFactory();
>>                 quickfix.LogFactory logFactory = getLogFactory(service,
>> settings, true);
>>                 //Get a new FIX Application
>> -                Application messageHandler =
>> applicationFactory.getFIXApplication(service, false);
>> +                Application messageHandler =
>> applicationFactory.getFIXApplication(service, senderThreadPool, false);
>>
>>                 Initiator initiator = new SocketInitiator(
>>                     messageHandler,
>> @@ -246,10 +260,10 @@
>>             }
>>
>>         } else {
>> -            String msg = "The " +
>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " +
>> -                    "not specified. Unable to initialize the initiator
>> session at this stage.";
>> -            log.info(msg);
>> -            throw new AxisFault(msg);
>> +            // FIX initiator session is not configured
>> +            // It could be intentional - So not an error (we don't need
>> initiators at all times)
>> +            log.info("The " +
>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " +
>> +                    "not specified. Unable to initialize the initiator
>> session at this stage.");
>>         }
>>     }
>>
>> @@ -276,6 +290,24 @@
>>     }
>>
>>     /**
>> +     * Stops all the FIX initiators created so far and cleans up all the
>> mappings
>> +     * related to them
>> +     */
>> +    public void disposeFIXInitiators() {
>> +        boolean debugEnabled = log.isDebugEnabled();
>> +
>> +        for (String key : initiatorStore.keySet()) {
>> +            initiatorStore.get(key).stop();
>> +            if (debugEnabled) {
>> +                log.debug("FIX initiator to the EPR " + key + "
>> stopped");
>> +            }
>> +        }
>> +
>> +        initiatorStore.clear();
>> +        applicationStore.clear();
>> +    }
>> +
>> +    /**
>>      * Returns an array of Strings representing EPRs for the specified
>> service
>>      *
>>      * @param serviceName the name of the service
>> @@ -444,6 +476,14 @@
>>         }
>>         return app;
>>     }
>> +
>> +    public void setListenerThreadPool(WorkerPool listenerThreadPool) {
>> +        this.listenerThreadPool = listenerThreadPool;
>> +    }
>> +
>> +    public void setSenderThreadPool(WorkerPool senderThreadPool) {
>> +        this.senderThreadPool = senderThreadPool;
>> +    }
>>  }
>>
>>
>>
>> Modified:
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>> URL:
>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java?rev=773818&r1=773817&r2=773818&view=diff
>>
>> ==============================================================================
>> ---
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>> (original)
>> +++
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>> Tue May 12 08:00:27 2009
>> @@ -60,14 +60,8 @@
>>                      TransportInDescription trpInDesc) throws AxisFault {
>>
>>         super.init(cfgCtx, trpInDesc);
>> -        //initialize the FIXSessionFactory
>> -        fixSessionFactory = new FIXSessionFactory(
>> -                new FIXApplicationFactory(this.cfgCtx, this.workerPool));
>> -        FIXTransportSender sender = (FIXTransportSender) cfgCtx.
>> -
>>  getAxisConfiguration().getTransportOut(FIXConstants.TRANSPORT_NAME).getSender();
>> -        if (sender != null) {
>> -            sender.setSessionFactory(fixSessionFactory);
>> -        }
>> +        fixSessionFactory = FIXSessionFactory.getInstance(new
>> FIXApplicationFactory(cfgCtx));
>> +        fixSessionFactory.setListenerThreadPool(this.workerPool);
>>         log.info("FIX transport listener initialized...");
>>     }
>>
>>
>> Modified:
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>> URL:
>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java?rev=773818&r1=773817&r2=773818&view=diff
>>
>> ==============================================================================
>> ---
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>> (original)
>> +++
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>> Tue May 12 08:00:27 2009
>> @@ -28,6 +28,8 @@
>>  import org.apache.axis2.transport.OutTransportInfo;
>>  import org.apache.axis2.transport.base.AbstractTransportSender;
>>  import org.apache.axis2.transport.base.BaseUtils;
>> +import org.apache.axis2.transport.base.threads.WorkerPool;
>> +import org.apache.axis2.transport.base.threads.WorkerPoolFactory;
>>  import org.apache.commons.logging.LogFactory;
>>  import quickfix.*;
>>  import quickfix.field.*;
>> @@ -51,17 +53,12 @@
>>
>>     private FIXSessionFactory sessionFactory;
>>     private FIXOutgoingMessageHandler messageSender;
>> +    private WorkerPool workerPool;
>>
>>     public FIXTransportSender() {
>>         this.log = LogFactory.getLog(this.getClass());
>>     }
>>
>> -
>> -    public void setSessionFactory(FIXSessionFactory sessionFactory) {
>> -        this.sessionFactory = sessionFactory;
>> -        this.messageSender.setSessionFactory(sessionFactory);
>> -    }
>> -
>>     /**
>>      * @param cfgCtx       the axis2 configuration context
>>      * @param transportOut the Out Transport description
>> @@ -69,10 +66,25 @@
>>      */
>>     public void init(ConfigurationContext cfgCtx, TransportOutDescription
>> transportOut) throws AxisFault {
>>         super.init(cfgCtx, transportOut);
>> +        this.sessionFactory = FIXSessionFactory.getInstance(new
>> FIXApplicationFactory(cfgCtx));
>> +        this.workerPool = WorkerPoolFactory.getWorkerPool(
>> +                            10, 20, 5, -1, "FIX Sender Worker thread
>> group", "FIX-Worker");
>> +        this.sessionFactory.setSenderThreadPool(this.workerPool);
>>         messageSender = new FIXOutgoingMessageHandler();
>> +        messageSender.setSessionFactory(this.sessionFactory);
>>         log.info("FIX transport sender initialized...");
>>     }
>>
>> +    public void stop() {
>> +        try {
>> +            this.workerPool.shutdown(10000);
>> +        } catch (InterruptedException e) {
>> +            log.warn("Thread interrupted while waiting for worker pool to
>> shut down");
>> +        }
>> +        sessionFactory.disposeFIXInitiators();
>> +        super.stop();
>> +    }
>> +
>>     /**
>>      * Performs the actual sending of the message.
>>      *
>>
>>
>>
>
>
> --
> Ruwan Linton
> Senior Software Engineer & Product Manager; WSO2 ESB; http://wso2.org/esb
> WSO2 Inc.; http://wso2.org
> email: ruwan@wso2.com; cell: +94 77 341 3097
> blog: http://ruwansblog.blogspot.com
>



-- 
Hiranya Jayathilaka
Software Engineer;
WSO2 Inc.;  http://wso2.org
E-mail: hiranya@wso2.com;  Mobile: +94 77 633 3491
Blog: http://techfeast-hiranya.blogspot.com