You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by hi...@apache.org on 2011/04/18 08:00:46 UTC

svn commit: r1094238 - /synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/

Author: hiranya
Date: Mon Apr 18 06:00:45 2011
New Revision: 1094238

URL: http://svn.apache.org/viewvc?rev=1094238&view=rev
Log:
Implementing SYNAPSE-668 and some minor bug fixes in the FIXOutgoingMessageHandler

Added:
    synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/SessionEventHandler.java
Modified:
    synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXConstants.java
    synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXIncomingMessageHandler.java
    synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXOutgoingMessageHandler.java
    synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
    synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
    synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
    synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXUtils.java

Modified: synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXConstants.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXConstants.java?rev=1094238&r1=1094237&r2=1094238&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXConstants.java (original)
+++ synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXConstants.java Mon Apr 18 06:00:45 2011
@@ -54,6 +54,7 @@ public class FIXConstants {
     public static final long DEFAULT_HEART_BT_INT_VALUE = 30;
     public static final String DEFAULT_START_TIME_VALUE = "00:00:00";
     public static final String DEFAULT_END_TIME_VALUE = "00:00:00";
+    public static final int DEFAULT_COUNTER_UPPER_LIMIT = 1000000000;
 
     public static final String HEART_BY_INT = "HeartBtInt";
     public static final String BEGIN_STRING = "BeginString";
@@ -102,4 +103,11 @@ public class FIXConstants {
 
     public static final String FIX_DROP_EXTRA_RESPONSES = "transport.fix.DropExtraResponses";
 
+    public static final String FIX_ACCEPTOR_EVENT_HANDLER = "transport.fix.AcceptorSessionEventHandler";
+    public static final String FIX_INITIATOR_EVENT_HANDLER = "transport.fix.InitiatorSessionEventHandler";
+
+    //--------------------------- Message level properties -----------------------------------
+
+    public static final String FIX_IGNORE_ORDER = "transport.fix.IgnoreOrder";
+
 }
\ No newline at end of file

Modified: synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXIncomingMessageHandler.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXIncomingMessageHandler.java?rev=1094238&r1=1094237&r2=1094238&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXIncomingMessageHandler.java (original)
+++ synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXIncomingMessageHandler.java Mon Apr 18 06:00:45 2011
@@ -41,11 +41,12 @@ import quickfix.field.SenderCompID;
 import quickfix.field.TargetCompID;
 
 import javax.xml.namespace.QName;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * FIXIncomingMessageHandler is responsible for handling all incoming FIX messages. This is where the
@@ -63,11 +64,12 @@ public class FIXIncomingMessageHandler i
     /** A boolean value indicating the type of the FIX application */
     private boolean acceptor;
     /** A Map of counters with one counter per session */
-    private Map<SessionID, Integer> countersMap;
+    private Map<SessionID, AtomicInteger> countersMap;
     private Queue<MessageContext> outgoingMessages;
-    private boolean allNewApproach;
-    private boolean dropExtraResponses;
+    private boolean allNewApproach = true;
+    private boolean dropExtraResponses = false;
     private Semaphore semaphore;
+    private SessionEventHandler eventHandler;
 
     public FIXIncomingMessageHandler(ConfigurationContext cfgCtx, WorkerPool workerPool,
                              AxisService service, boolean acceptor) {
@@ -76,25 +78,43 @@ public class FIXIncomingMessageHandler i
         this.service = service;
         this.log = LogFactory.getLog(this.getClass());
         this.acceptor = acceptor;
-        countersMap = new HashMap<SessionID, Integer>();
+        countersMap = new ConcurrentHashMap<SessionID, AtomicInteger>();
         outgoingMessages = new LinkedBlockingQueue<MessageContext>();
         semaphore = new Semaphore(0);
         getResponseHandlingApproach();
+
+        Parameter eventHandlerParam;
+        if (acceptor) {
+            eventHandlerParam = service.getParameter(FIXConstants.FIX_ACCEPTOR_EVENT_HANDLER);
+        } else {
+            eventHandlerParam = service.getParameter(FIXConstants.FIX_INITIATOR_EVENT_HANDLER);
+        }
+
+        if (eventHandlerParam != null && eventHandlerParam.getValue() != null &&
+                !"".equals(eventHandlerParam.getValue())) {
+            try {
+                Class clazz = getClass().getClassLoader().loadClass(
+                        (String) eventHandlerParam.getValue());
+                eventHandler = (SessionEventHandler) clazz.newInstance();
+            } catch (ClassNotFoundException e) {
+                log.error("Unable to find the session event handler class: " +
+                        eventHandlerParam.getValue(), e);
+            } catch (Exception e) {
+                log.error("Error while initializing the session event handler class: " +
+                        eventHandlerParam.getValue(), e);
+            }
+        }
     }
 
     private void getResponseHandlingApproach() {
         Parameter param = service.getParameter(FIXConstants.FIX_RESPONSE_HANDLER_APPROACH);
         if (param != null && "false".equals(param.getValue().toString())) {
             allNewApproach = false;
-        } else {
-            allNewApproach = true;
         }
 
         Parameter dropResponsesParam = service.getParameter(FIXConstants.FIX_DROP_EXTRA_RESPONSES);
         if (dropResponsesParam != null && "true".equals(dropResponsesParam.getValue().toString())) {
             dropExtraResponses = true;
-        } else {
-            dropExtraResponses = false;
         }
     }
 
@@ -119,12 +139,15 @@ public class FIXIncomingMessageHandler i
      * Sessions exist whether or not a counter party is connected to it. As soon
      * as a session is created, the application can begin sending messages to it. If no one
      * is logged on, the messages will be sent at the time a connection is
-     * established with the counterparty.
+     * established with the counter party.
      *
      * @param sessionID QuickFIX session ID
      */
     public void onCreate(SessionID sessionID) {
         log.info("New FIX session created: " + sessionID.toString());
+        if (eventHandler != null) {
+            eventHandler.onCreate(sessionID);
+        }
     }
 
     /**
@@ -136,9 +159,15 @@ public class FIXIncomingMessageHandler i
      * @param sessionID QuickFIX session ID
      */
     public void onLogon(SessionID sessionID) {
-        countersMap.put(sessionID, 0);
+        if (!countersMap.containsKey(sessionID)) {
+            countersMap.put(sessionID, new AtomicInteger(0));
+        }
         log.info("FIX session logged on: " + sessionID.toString());
         semaphore.release();
+
+        if (eventHandler != null) {
+            eventHandler.onLogon(sessionID);
+        }
     }
 
     /**
@@ -149,11 +178,14 @@ public class FIXIncomingMessageHandler i
      * @param sessionID QuickFIX session ID
      */
     public void onLogout(SessionID sessionID) {
-        countersMap.put(sessionID, 0);
         FIXTransportSender trpSender = (FIXTransportSender) cfgCtx.getAxisConfiguration().
                 getTransportOut(FIXConstants.TRANSPORT_NAME).getSender();
         trpSender.logOutIncomingSession(sessionID);
         log.info("FIX session logged out: " + sessionID.toString());
+
+        if (eventHandler != null) {
+            eventHandler.onLogout(sessionID);
+        }
     }
 
     /**
@@ -182,6 +214,10 @@ public class FIXIncomingMessageHandler i
                 log.trace("Message: " + message.toString());
             }
         }
+
+        if (eventHandler != null) {
+            eventHandler.toAdmin(message, sessionID);
+        }
     }
 
     /**
@@ -209,11 +245,15 @@ public class FIXIncomingMessageHandler i
                 log.trace("Message: " + message.toString());
             }
         }
+
+        if (eventHandler != null) {
+            eventHandler.fromAdmin(message, sessionID);
+        }
     }
 
     /**
      * This is a callback for application messages that are being sent to a
-     * counterparty.
+     * counter party.
      *
      * @param message QuickFIX message
      * @param sessionID QuickFIX session ID
@@ -236,6 +276,10 @@ public class FIXIncomingMessageHandler i
                 log.trace("Message: " + message.toString());
             }
         }
+
+        if (eventHandler != null) {
+            eventHandler.toApp(message, sessionID);
+        }
     }
 
     /**
@@ -264,10 +308,12 @@ public class FIXIncomingMessageHandler i
             }
         }
 
-        int counter = countersMap.get(sessionID);
-        counter++;
-        countersMap.put(sessionID, counter);
-
+        AtomicInteger atomicCounter = countersMap.get(sessionID);
+        int counter = atomicCounter.incrementAndGet();
+        boolean rolled = atomicCounter.compareAndSet(FIXConstants.DEFAULT_COUNTER_UPPER_LIMIT, 0);
+        if (rolled && log.isDebugEnabled()) {
+            log.debug("Incoming request counter rolled over for the session: " + sessionID);
+        }
         workerPool.execute(new FIXWorkerThread(message, sessionID, counter));
     }
 
@@ -288,7 +334,12 @@ public class FIXIncomingMessageHandler i
         }
 
         private void handleIncomingRequest() {
-            //Create message context for the incmong message
+            if (log.isDebugEnabled()) {
+                log.debug("Source session: " + sessionID + " - Received message with sequence " +
+                        "number " + counter);
+            }
+
+            //Create message context for the incoming message
             AbstractTransportListener trpListener = (AbstractTransportListener) cfgCtx.getAxisConfiguration().
                     getTransportIn(FIXConstants.TRANSPORT_NAME).getReceiver();
 
@@ -311,16 +362,12 @@ public class FIXIncomingMessageHandler i
                     msgCtx.setAxisMessage(operation.getMessage(WSDLConstants.MESSAGE_LABEL_IN_VALUE));
                     msgCtx.setSoapAction("urn:" + operation.getName().getLocalPart());
                 }
-            } else {
-                log.warn("Service information not available for the FIX message processor");
-                return;
             }
 
             String fixApplication = FIXConstants.FIX_INITIATOR;
             if (acceptor) {
                 fixApplication = FIXConstants.FIX_ACCEPTOR;
-            }
-            else {
+            } else {
                 msgCtx.setProperty("synapse.isresponse", true);
             }
 

Modified: synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXOutgoingMessageHandler.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXOutgoingMessageHandler.java?rev=1094238&r1=1094237&r2=1094238&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXOutgoingMessageHandler.java (original)
+++ synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXOutgoingMessageHandler.java Mon Apr 18 06:00:45 2011
@@ -20,28 +20,32 @@
 package org.apache.synapse.transport.fix;
 
 import org.apache.axis2.context.MessageContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import quickfix.Message;
 import quickfix.Session;
 import quickfix.SessionID;
 import quickfix.SessionNotFound;
 
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * FIXOutgoingMessageHandler makes sure that messages are delivered in the order they were received by
- * a FIX acceptor. In case the message arrived over a different transport srill this class will try to
+ * a FIX acceptor. In case the message arrived over a different transport still this class will try to
  * put the messages in correct order based on the counter value of the message.
  */
 public class FIXOutgoingMessageHandler {
 
+    private static final Log log = LogFactory.getLog(FIXOutgoingMessageHandler.class);
+
     private Map<String, Integer> countersMap;
     private Map<String, Map<Integer,Object[]>> messagesMap;
     private FIXSessionFactory sessionFactory;
 
     public FIXOutgoingMessageHandler() {
-        countersMap = new HashMap<String, Integer>();
-        messagesMap = new HashMap<String, Map<Integer,Object[]>>();
+        countersMap = new ConcurrentHashMap<String, Integer>();
+        messagesMap = new ConcurrentHashMap<String, Map<Integer,Object[]>>();
     }
 
     public void setSessionFactory(FIXSessionFactory sessionFactory) {
@@ -64,35 +68,43 @@ public class FIXOutgoingMessageHandler {
     public synchronized void sendMessage(Message message, SessionID targetSession, String sourceSession,
                             int counter, MessageContext msgCtx, String targetEPR) throws SessionNotFound {
 
-        if (sourceSession != null && counter != -1) {
+        boolean ignoreOrder = "true".equals(msgCtx.getProperty(FIXConstants.FIX_IGNORE_ORDER));
+        if (sourceSession != null && counter != -1 && !ignoreOrder) {
 
             int expectedValue;
             if (countersMap.containsKey(sourceSession)) {
                 expectedValue = countersMap.get(sourceSession);
-            }
-            else {
+            } else {
                 //create new entries in the respective Maps
                 //counter starts at 1
                 countersMap.put(sourceSession, 1);
-                messagesMap.put(sourceSession, new HashMap<Integer,Object[]>());
+                messagesMap.put(sourceSession, new ConcurrentHashMap<Integer,Object[]>());
                 expectedValue = 1;
             }
 
             if (expectedValue == counter) {
                 sendToTarget(msgCtx, targetEPR, message, targetSession);
-                countersMap.put(sourceSession, expectedValue++);
+                if (FIXConstants.DEFAULT_COUNTER_UPPER_LIMIT == expectedValue) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Outgoing request counter rolled over for the session: " +
+                                sourceSession + " (from " + expectedValue + ")");
+                    }
+                    expectedValue = 1;
+                }
+                countersMap.put(sourceSession, ++expectedValue);
                 sendQueuedMessages(expectedValue, sourceSession);
             }
             else {
+                if (log.isDebugEnabled()) {
+                    log.debug("Source session: " + sourceSession + " - Expected sequence number (" +
+                            expectedValue + ") does not match with the actual sequence number (" +
+                            counter + "). Holding the message back for later delivery.");
+                }
+
                 //save the message to be sent later...
                 Map<Integer,Object[]> messages = messagesMap.get(sourceSession);
-                Object[] obj = new Object[4];
-                obj[0] = message;
-                obj[1] = targetSession;
-                obj[2] = msgCtx;
-                obj[3] = targetEPR;
+                Object[] obj = new Object[] { message, targetSession, msgCtx, targetEPR } ;
                 messages.put(counter, obj);
-                messagesMap.put(sourceSession, messages);
             }
         }
         else {
@@ -144,11 +156,23 @@ public class FIXOutgoingMessageHandler {
             String targetEPR = null;
             if (obj[2] != null) {
                 msgCtx = (MessageContext) obj[2];
-                targetEPR = obj[3].toString();
+                targetEPR = (String) obj[3];
+            }
+
+            if (log.isDebugEnabled()) {
+                log.debug("Source session: " + session + " - Sending the previously queued message " +
+                        "with the sequence number: " + expectedValue);
             }
             sendToTarget(msgCtx, targetEPR, message, sessionID);
             messages.remove(expectedValue);
-            obj = messages.get(expectedValue++);
+            if (FIXConstants.DEFAULT_COUNTER_UPPER_LIMIT == expectedValue) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Outgoing request counter rolled over for the session: " + session +
+                        " (from " + expectedValue + ")");
+                }
+                expectedValue = 1;
+            }
+            obj = messages.get(++expectedValue);
         }
         messagesMap.put(session, messages);
         countersMap.put(session, expectedValue);
@@ -163,6 +187,12 @@ public class FIXOutgoingMessageHandler {
                 if (obj != null) {
                     Message message = (Message) obj[0];
                     SessionID sessionID = (SessionID) obj[1];
+
+                    if (log.isDebugEnabled()) {
+                        log.debug("Source session: " + session + " - Flushing the previously queued " +
+                                "message with the sequence number: " + expectedValue);
+                    }
+
                     try {
                         Session.sendToTarget(message, sessionID);
                     } catch (SessionNotFound ignore) { }

Modified: synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java?rev=1094238&r1=1094237&r2=1094238&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java (original)
+++ synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java Mon Apr 18 06:00:45 2011
@@ -98,7 +98,7 @@ public class FIXSessionFactory {
      * acceptorStore keyed by the service name and start it.
      *
      * @param service the AxisService
-     * @return true if the acceptor was created and started properly and false otherwise
+     * @return true if the acceptor is successfully initialized and false otherwise
      * @throws AxisFault if the acceptor cannot be created
      */
     public boolean createFIXAcceptor(AxisService service) throws AxisFault {
@@ -175,7 +175,7 @@ public class FIXSessionFactory {
             try {
                 settings = new SessionSettings(fixConfigStream);
             } catch (ConfigError e) {
-                throw new AxisFault("Error in the specified FIX configuration for the initiaotr. " +
+                throw new AxisFault("Error in the specified FIX configuration for the initiator. " +
                         "Unable to initialize a FIX session for the service " +
                         service.getName(), e);
             }
@@ -349,7 +349,7 @@ public class FIXSessionFactory {
      * @param serviceName the name of the AxisService
      * @return a FIX Acceptor for the service
      */
-    public Acceptor getAccepter(String serviceName) {
+    public Acceptor getAcceptor(String serviceName) {
         return acceptorStore.get(serviceName);
     }
 
@@ -361,7 +361,7 @@ public class FIXSessionFactory {
      */
    public Initiator getInitiator(String fixEPR) {
         return initiatorStore.get(fixEPR);
-    }
+   }
 
     /**
      * Get the FIX configuration URL from the services.xml.
@@ -391,8 +391,8 @@ public class FIXSessionFactory {
             } catch (IOException e) {
                 log.error("Error while reading from the URL " + fixConfigURLValue, e);
             }
-        } else if (log.isDebugEnabled()) {
-            log.debug("FIX configuration URL is not specified for the service " + service.getName());
+        } else {
+            log.info("FIX configuration URL is not specified for the service " + service.getName());
         }
 
         return fixConfigStream;
@@ -506,8 +506,7 @@ public class FIXSessionFactory {
             jmxExporter.setRegistrationBehavior(JmxExporter.REGISTRATION_IGNORE_EXISTING);
             jmxExporter.export(connector);
         } catch (JMException e) {
-            log.error("Error while initializing JMX support for the FIX sessions in " +
-                    "service: " + service, e);
+            log.error("Error while initializing JMX support for the service: " + service, e);
         }
     }
 }

Modified: synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java?rev=1094238&r1=1094237&r2=1094238&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java (original)
+++ synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java Mon Apr 18 06:00:45 2011
@@ -71,7 +71,7 @@ public class FIXTransportListener extend
      *
      * @param service the service for which to listen for messages
      */
-    protected void startListeningForService(AxisService service) {
+    public void startListeningForService(AxisService service) {
         try {
             boolean acceptorCreated = fixSessionFactory.createFIXAcceptor(service);
             boolean initiatorCreated = fixSessionFactory.createFIXInitiator(service);

Modified: synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java?rev=1094238&r1=1094237&r2=1094238&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java (original)
+++ synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java Mon Apr 18 06:00:45 2011
@@ -46,7 +46,7 @@ import java.util.Map;
  * <p/>
  * This transport sender implementation does not support forwarding FIX messages to sessions with
  * different BeginString values.When it performs a message forwarding it makes sure the forwarding
- * takes place according to the conditions specified in the 'Thirs Party Routing' section in the
+ * takes place according to the conditions specified in the 'Third Party Routing' section in the
  * FIX protocol specification.
  */
 public class FIXTransportSender extends AbstractTransportSender {
@@ -108,7 +108,7 @@ public class FIXTransportSender extends 
         try {
             fixMessage = FIXUtils.getInstance().createFIXMessage(msgCtx);
         } catch (IOException e) {
-            handleException("Exception occured while creating the FIX message from SOAP Envelope", e);
+            handleException("Exception occurred while creating the FIX message from SOAP Envelope", e);
         }
 
         if (FIXConstants.FIX_ACCEPTOR.equals(fixApplication)) {
@@ -118,11 +118,12 @@ public class FIXTransportSender extends 
                 sendUsingEPR(targetEPR, serviceName, fixMessage, sourceSession, counter, msgCtx);
             } else if (outTransportInfo != null && outTransportInfo instanceof FIXOutTransportInfo) {
                 //Send the message back to the sender
-                sendUsingTrpOutInfo(outTransportInfo, serviceName, fixMessage, sourceSession, counter);
+                sendUsingTrpOutInfo(outTransportInfo, serviceName, fixMessage,
+                        sourceSession, counter, msgCtx);
             }
         } else if (FIXConstants.FIX_INITIATOR.equals(fixApplication)) {
 
-            if (sendUsingAcceptorSession(serviceName, fixMessage, sourceSession, counter)) {
+            if (sendUsingAcceptorSession(serviceName, fixMessage, sourceSession, counter, msgCtx)) {
                 return;
             } else if (targetEPR != null) {
                 sendUsingEPR(targetEPR, serviceName, fixMessage, sourceSession, counter, msgCtx);
@@ -135,12 +136,12 @@ public class FIXTransportSender extends 
             if (targetEPR != null) {
                 sendUsingEPR(targetEPR, serviceName, fixMessage, sourceSession, counter, msgCtx);
             } else {
-                sendUsingAcceptorSession(serviceName, fixMessage, sourceSession, counter);
+                sendUsingAcceptorSession(serviceName, fixMessage, sourceSession, counter, msgCtx);
             }
         }
     }
 
-    private boolean isTargetVald(Map<String, String> fieldValues, SessionID targetSession,
+    private boolean isTargetValid(Map<String, String> fieldValues, SessionID targetSession,
                                  boolean beginStrValidation) {
         
         String beginString = fieldValues.get(FIXConstants.BEGIN_STRING);
@@ -239,7 +240,7 @@ public class FIXTransportSender extends 
 
     /**
      * Puts DeliverToX fields in the message to enable the message to be forwarded at the destination.
-     * This method retireves the parameters from the services.xml and put them in the message as
+     * This method retrieves the parameters from the services.xml and put them in the message as
      * DeliverToX fields. Should be used when a response message has to forwarded at the destination.
      *
      * @param message the FIX message to be forwarded
@@ -322,12 +323,14 @@ public class FIXTransportSender extends 
      * @param fixMessage  the FIX message to be sent
      * @param srcSession  String uniquely identifying the incoming session
      * @param counter     application level sequence number of the message
-     * @param serviceName name of the AxisSerivce for the message
+     * @param serviceName name of the AxisService for the message
+     * @param msgCtx      Axis2 MessageContext
      * @return boolean value indicating the result
      * @throws AxisFault on error
      */
     private boolean sendUsingTrpOutInfo(OutTransportInfo trpOutInfo, String serviceName,
-                                        Message fixMessage, String srcSession, int counter) throws AxisFault {
+                                        Message fixMessage, String srcSession, int counter,
+                                        MessageContext msgCtx) throws AxisFault {
 
         FIXOutTransportInfo fixOut = (FIXOutTransportInfo) trpOutInfo;
         SessionID sessionID = fixOut.getSessionID();
@@ -356,7 +359,7 @@ public class FIXTransportSender extends 
         }
 
         try {
-            messageSender.sendMessage(fixMessage, sessionID, srcSession, counter, null, null);
+            messageSender.sendMessage(fixMessage, sessionID, srcSession, counter, msgCtx, null);
             return true;
         } catch (SessionNotFound e) {
             log.error("Error while sending the FIX message. Session " + sessionID.toString() + " does" +
@@ -366,22 +369,23 @@ public class FIXTransportSender extends 
     }
 
     /**
-     * Send the message using a session in the aaceptor side
+     * Send the message using a session in the acceptor side
      *
      * @param serviceName the service of the message
      * @param fixMessage  the FIX message to be sent
      * @param srcSession  String uniquely identifying the incoming session
      * @param counter     the application level sequence number of the message
+     * @param msgCtx      Axi2 MessageContext
      * @return boolean value indicating the result
      * @throws AxisFault on error
      */
     private boolean sendUsingAcceptorSession(String serviceName, Message fixMessage, String srcSession,
-                                             int counter) throws AxisFault {
+                                             int counter, MessageContext msgCtx) throws AxisFault {
 
         Map<String, String> fieldValues = FIXUtils.getMessageForwardingParameters(fixMessage);
         String deliverToCompID = fieldValues.get(FIXConstants.DELIVER_TO_COMP_ID);
 
-        Acceptor acceptor = sessionFactory.getAccepter(serviceName);
+        Acceptor acceptor = sessionFactory.getAcceptor(serviceName);
         SessionID sessionID = null;
 
         AxisService service = cfgCtx.getAxisConfiguration().getService(serviceName);
@@ -390,14 +394,14 @@ public class FIXTransportSender extends 
             ArrayList<SessionID> sessions = acceptor.getSessions();
             if (sessions.size() == 1) {
                 sessionID = sessions.get(0);
-                if (deliverToCompID != null && !isTargetVald(fieldValues, sessionID, isValidationOn(service))) {
+                if (deliverToCompID != null && !isTargetValid(fieldValues, sessionID, isValidationOn(service))) {
                     sessionID = null;
                 }
 
             } else if (sessions.size() > 1 && deliverToCompID != null) {
-                for (int i = 0; i < sessions.size(); i++) {
-                    sessionID = sessions.get(i);
-                    if (isTargetVald(fieldValues, sessionID, isValidationOn(service))) {
+                for (SessionID session : sessions) {
+                    sessionID = session;
+                    if (isTargetValid(fieldValues, sessionID, isValidationOn(service))) {
                         break;
                     }
                 }
@@ -407,7 +411,8 @@ public class FIXTransportSender extends 
         if (sessionID != null) {
             //Found a valid session. Now forward the message...
             FIXOutTransportInfo fixOutInfo = new FIXOutTransportInfo(sessionID);
-            return sendUsingTrpOutInfo(fixOutInfo, serviceName, fixMessage, srcSession, counter);
+            return sendUsingTrpOutInfo(fixOutInfo, serviceName, fixMessage,
+                    srcSession, counter, msgCtx);
         }
         return false;
     }

Modified: synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXUtils.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXUtils.java?rev=1094238&r1=1094237&r2=1094238&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXUtils.java (original)
+++ synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXUtils.java Mon Apr 18 06:00:45 2011
@@ -213,7 +213,7 @@ public class FIXUtils {
                 List<Group> groupList = message.getGroups(groupKey);
                 Iterator<Group> groupIterator = groupList.iterator();
 
-                while(groupIterator.hasNext()) {
+                while (groupIterator.hasNext()) {
                     Group msgGroup = groupIterator.next();
                     OMElement groupField = soapFactory.createOMElement(FIXConstants.FIX_GROUP, null);
                     // rec. call the method to process the repeating groups

Added: synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/SessionEventHandler.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/SessionEventHandler.java?rev=1094238&view=auto
==============================================================================
--- synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/SessionEventHandler.java (added)
+++ synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/SessionEventHandler.java Mon Apr 18 06:00:45 2011
@@ -0,0 +1,36 @@
+/*
+*  Licensed to the Apache Software Foundation (ASF) under one
+*  or more contributor license agreements.  See the NOTICE file
+*  distributed with this work for additional information
+*  regarding copyright ownership.  The ASF licenses this file
+*  to you under the Apache License, Version 2.0 (the
+*  "License"); you may not use this file except in compliance
+*  with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+*  Unless required by applicable law or agreed to in writing,
+*  software distributed under the License is distributed on an
+*   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+*  KIND, either express or implied.  See the License for the
+*  specific language governing permissions and limitations
+*  under the License.
+*/
+
+package org.apache.synapse.transport.fix;
+
+public interface SessionEventHandler {
+
+    void onCreate(quickfix.SessionID sessionID);
+
+    void onLogon(quickfix.SessionID sessionID);
+
+    void onLogout(quickfix.SessionID sessionID);
+
+    void toAdmin(quickfix.Message message, quickfix.SessionID sessionID);
+
+    void fromAdmin(quickfix.Message message, quickfix.SessionID sessionID);
+
+    void toApp(quickfix.Message message, quickfix.SessionID sessionID);
+
+}