You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by hi...@apache.org on 2008/07/10 07:47:43 UTC

svn commit: r675450 - in /synapse/trunk/java: modules/transports/src/main/java/org/apache/synapse/transport/fix/ src/site/xdoc/

Author: hiranya
Date: Wed Jul  9 22:47:42 2008
New Revision: 675450

URL: http://svn.apache.org/viewvc?rev=675450&view=rev
Log:
Made some changes to the FIX transport impl to resolve the issue SYNAPSE-380.
Now the initiator sessions are created at startup if all the necessary settings are specified in the FIX configuration files.
Most of the changes are done to the FIXSessionFactory class. A new method createFIXInitiaotr(AxisService) was added and it is called from the transport listener at the startup. Couple of utility methods were added to the FIXUtils class to support new functionality.
Relevant sections in the documentation were updated as necessary.


Modified:
    synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
    synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
    synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/fix/FIXUtils.java
    synapse/trunk/java/src/site/xdoc/Synapse_Samples.xml
    synapse/trunk/java/src/site/xdoc/Synapse_Samples_Setup.xml

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java?rev=675450&r1=675449&r2=675450&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java Wed Jul  9 22:47:42 2008
@@ -61,7 +61,7 @@
     private Map<String,Acceptor> acceptorStore;
     /** A Map containing all the FIX Initiators created by this factory, keyed by FIX EPR */
     private Map<String,Initiator> initiatorStore;
-    /** A Map containing all the FIX application created for initiators, keyed by FIX EPR */
+    /** A Map containing all the FIX applications created for initiators, keyed by FIX EPR */
     private Map<String, Application> applicationStore;
     /** An ApplicationFactory handles creating FIX Applications (FIXIncomingMessageHandler Objects) */
     private FIXApplicationFactory applicationFactory;
@@ -116,7 +116,7 @@
                         "FIX session for the service " + service.getName(), e);
             }
         }
-        log.error("Unable to initialize a FIX session for the service " + service.getName());
+        log.info("Unable to initialize a FIX acceptor session for the service " + service.getName());
     }
 
     /**
@@ -146,8 +146,8 @@
             try {
                 settings = new SessionSettings(fixConfigStream);
             } catch (ConfigError e) {
-                throw new AxisFault("Error in the specified FIX configuration. Unable to initialize a " +
-                        "FIX session for the service " + service.getName(), e);
+                throw new AxisFault("Error in the specified FIX configuration for the initiaotr. Unable " +
+                        "to initialize a FIX session for the service " + service.getName(), e);
             }
         }
 
@@ -187,11 +187,63 @@
             fixMessageHandler.acquire();
 
         } catch (ConfigError e) {
-            throw new AxisFault("Error in the specified FIX configuration. Unable to initialize a " +
-                    "FIX initiator.", e);
+            throw new AxisFault("Error in the specified FIX configuration for the initiator. Unable " +
+                    "to initialize a FIX initiator.", e);
         } catch (InterruptedException ignore) { }
     }
 
+    public void createFIXInitiator(AxisService service) {
+
+        InputStream fixConfigStream = getFIXConfigAsStream(service, false);
+        if (fixConfigStream != null) {
+
+            if (log.isDebugEnabled()) {
+                log.debug("Attempting to initialize a new FIX initiator " +
+                    "for the service " + service.getName());
+            }
+
+            try {
+                SessionSettings settings = new SessionSettings(fixConfigStream);
+
+                MessageStoreFactory storeFactory = getMessageStoreFactory(service, settings, true);
+                MessageFactory messageFactory = new DefaultMessageFactory();
+                quickfix.LogFactory logFactory = getLogFactory(service, settings, true);
+                //Get a new FIX Application
+                Application messageHandler = applicationFactory.getFIXApplication(service, false);
+
+                Initiator initiator = new SocketInitiator(
+                    messageHandler,
+                    storeFactory,
+                    settings,
+                    logFactory,
+                    messageFactory);
+
+                String[] EPRs = FIXUtils.getEPRs(settings);
+                for (int i=0; i<EPRs.length; i++) {
+                    initiatorStore.put(EPRs[i], initiator);
+                    applicationStore.put(EPRs[i], messageHandler);
+                }
+                initiator.start();
+            }
+            catch (FieldConvertError e) {
+                log.info("FIX configuration file for the initiator session of the service " +
+                        service.getName() + " is either incomplete or invalid." +
+                        " Not creating the initiator session at this stage.");
+            }
+            catch (ConfigError e) {
+                log.info("FIX configuration file for the initiator session of the service " +
+                        service.getName() + " is either incomplete or invalid." +
+                        " Not creating the initiator session at this stage.");
+            }
+
+        }
+        else {
+            log.info("The " + FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is not specified. " +
+                    "Unable to initialize the initiator session at this stage.");
+        }
+    }
+
+
     /**
      * Get the FIX Acceptor for the specified service from the sessionStore Map and
      * stop it. Then remove the Acceptor from the Map.
@@ -282,7 +334,7 @@
                 log.error("Error while reading from the URL " + fixConfigURLValue, e);
             }
         } else {
-            log.error("FIX configuration URL is not specified for the service " + service.getName());
+            log.info("FIX configuration URL is not specified for the service " + service.getName());
         }
 
         return fixConfigStream;
@@ -368,6 +420,21 @@
     }
     
     public Application getApplication(String fixEPR) {
-        return applicationStore.get(fixEPR);
+        Application app = applicationStore.get(fixEPR);
+        if (app == null) {
+            Iterator<String> keys = applicationStore.keySet().iterator();
+            while (keys.hasNext()) {
+                String epr = keys.next();
+                if (FIXUtils.compareURLs(epr, fixEPR)) {
+                    app = applicationStore.get(epr);
+                    applicationStore.remove(epr);
+                    applicationStore.put(fixEPR, app);
+                    break;
+                }
+            }
+        }
+        return app;
     }
-}
\ No newline at end of file
+}
+
+

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java?rev=675450&r1=675449&r2=675450&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java Wed Jul  9 22:47:42 2008
@@ -25,7 +25,6 @@
 import org.apache.axis2.description.AxisService;
 import org.apache.axis2.description.TransportInDescription;
 import org.apache.synapse.transport.base.AbstractTransportListener;
-import org.apache.synapse.transport.base.ManagementSupport;
 
 /**
  * The FIX transport listener implementation. A FIX Transport Listner will hold
@@ -77,6 +76,7 @@
      */
     protected void startListeningForService(AxisService service) {
         fixSessionFactory.createFIXAcceptor(service);
+        fixSessionFactory.createFIXInitiator(service);
     }
 
     /**

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/fix/FIXUtils.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/fix/FIXUtils.java?rev=675450&r1=675449&r2=675450&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/fix/FIXUtils.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/fix/FIXUtils.java Wed Jul  9 22:47:42 2008
@@ -332,6 +332,65 @@
         return EPRList;
     }
 
+    public static String[] getEPRs(SessionSettings settings) throws FieldConvertError, ConfigError {
+        Iterator<SessionID> sessions = settings.sectionIterator();
+        String[] EPRs = new String[settings.size()];
+        int i = 0;
+        while (sessions.hasNext()) {
+            SessionID session = sessions.next();
+            String EPR = FIXConstants.FIX_PREFIX;
+            String paramValue = null;
+
+            EPR += settings.getString(session, FIXConstants.SOCKET_CONNECT_HOST);
+            EPR += ":" + settings.getString(session, FIXConstants.SOCKET_CONNECT_PORT);
+            EPR += "?" + FIXConstants.BEGIN_STRING + "=";
+            EPR += settings.getString(session, FIXConstants.BEGIN_STRING);
+            EPR += "&" + FIXConstants.SENDER_COMP_ID + "=";
+            EPR += settings.getString(session, FIXConstants.SENDER_COMP_ID);
+            EPR += "&" + FIXConstants.TARGET_COMP_ID + "=";
+            EPR += settings.getString(session, FIXConstants.TARGET_COMP_ID);
+
+            try {
+                paramValue = settings.getString(session, FIXConstants.SENDER_SUB_ID);
+                if (paramValue != null) {
+                   EPR += "&" + FIXConstants.SENDER_SUB_ID + "=";
+                   EPR += paramValue;
+                }
+            }
+            catch (ConfigError ignore) { }
+
+            try {
+                paramValue = settings.getString(session, FIXConstants.SENDER_LOCATION_ID);
+                if (paramValue != null) {
+                   EPR += "&" + FIXConstants.SENDER_LOCATION_ID + "=";
+                   EPR += paramValue;
+                }
+            }
+            catch (ConfigError ignore) { }
+
+            try {
+                paramValue = settings.getString(session, FIXConstants.TARGET_SUB_ID);
+                if (paramValue != null) {
+                   EPR += "&" + FIXConstants.TARGET_SUB_ID + "=";
+                   EPR += paramValue;
+                }
+            }
+            catch (ConfigError ignore) { }
+
+            try {
+                paramValue = settings.getString(session, FIXConstants.TARGET_LOCATION_ID);
+                if (paramValue != null) {
+                   EPR += "&" + FIXConstants.TARGET_LOCATION_ID + "=";
+                   EPR += paramValue;
+                }
+            }
+            catch (ConfigError ignore) { }
+
+            EPRs[i] = EPR;
+        }
+        return EPRs;
+    }
+
     /**
      * Extracts parameters embedded in FIX EPRs
      *
@@ -354,6 +413,36 @@
         return h;
     }
 
+    /**
+     * Compares two given FIX URL strings. The second URL is considered equal to the
+     * first URL if all the properties in the first URL also exist in the second URL
+     * and if they have equals values.
+     *
+     * @param url1 a FIX URL String
+     * @param url2 a FIX URL String
+     * @return a boolean value
+     */
+    public static boolean compareURLs(String url1, String url2) {
+        if (!url1.substring(0, url1.indexOf("?")).equals(url2.substring(0, url2.indexOf("?")))) {
+             return false;
+        }
+        else {
+            Hashtable<String,String> properties1 = getProperties(url1);
+            Hashtable<String, String> properties2 = getProperties(url2);
+            Iterator<String> keys = properties1.keySet().iterator();
+            while (keys.hasNext()) {
+                String key = keys.next();
+                if (!properties2.containsKey(key)) {
+                    return false;
+                }
+                else if (!properties1.get(key).equals(properties2.get(key))) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
     /*
      * This is here because AXIOM does not support removing CDATA tags yet. Given a String embedded in
      * CDATA tags this method will return the String element only.

Modified: synapse/trunk/java/src/site/xdoc/Synapse_Samples.xml
URL: http://svn.apache.org/viewvc/synapse/trunk/java/src/site/xdoc/Synapse_Samples.xml?rev=675450&r1=675449&r2=675450&view=diff
==============================================================================
--- synapse/trunk/java/src/site/xdoc/Synapse_Samples.xml (original)
+++ synapse/trunk/java/src/site/xdoc/Synapse_Samples.xml Wed Jul  9 22:47:42 2008
@@ -3768,18 +3768,15 @@
       fix-synapse.cfg file you created. Also make sure that transport.fix.
       InitiatorConfigURL property points to the synapse-sender.cfg file you
       created. Once done you can start the Synapse configuration numbered 257:
-      i.e. synapse -sample 257. Note that Synapse creates a new FIX session with
-      Banzai at this point.
+      i.e. synapse -sample 257. Note that Synapse creates new FIX sessions with
+      Banzai and Executor at this point.
     </p>
     <p>
       Send an order request from Banzai to Synapse.
     </p>
     <p>
-      Synapse will create a session with Executor and forward the order request. The
-      responses coming from the Executor will be sent back to Banzai. It will take a
-      couple of seconds for the responses to arrive for the first request since Synapse
-      has to establish a session with the Executor. But all the subsequent messages
-      will pass through almost instantaneously.
+      Synapse will forward the order request to the session with the Executor. The
+      responses coming from the Executor will be sent back to Banzai.
     </p>
       <p>
           To get an idea about the various transport parameters being used in this sample
@@ -3847,7 +3844,7 @@
       <pre>ant fixclient -Dsymbol=IBM -Dqty=5 -Dmode=buy -Daddurl=http://localhost:8280/services/FIXProxy</pre>
     </p>
     <p>
-      Synapse will create a session with Executor and forward the order request. The
+      Synapse will forward the order request to the session with the Executor. The
       first response coming from the Executor will be sent back over HTTP. Executor 
       generally sends two responses for each incoming order request. But since the
       response has to be forwarded over HTTP only one can be sent back to the client. 

Modified: synapse/trunk/java/src/site/xdoc/Synapse_Samples_Setup.xml
URL: http://svn.apache.org/viewvc/synapse/trunk/java/src/site/xdoc/Synapse_Samples_Setup.xml?rev=675450&r1=675449&r2=675450&view=diff
==============================================================================
--- synapse/trunk/java/src/site/xdoc/Synapse_Samples_Setup.xml (original)
+++ synapse/trunk/java/src/site/xdoc/Synapse_Samples_Setup.xml Wed Jul  9 22:47:42 2008
@@ -597,6 +597,13 @@
 	EndTime=00:00:00
 	HeartBtInt=30
 	ReconnectInterval=5
+    SenderCompID=SYNAPSE
+    TargetCompID=EXEC
+    ConnectionType=initiator
+
+    [session]
+    BeginString=FIX.4.0
+    SocketConnectPort=19876
       </pre>
     </p>
     <p/>
@@ -755,7 +762,6 @@
        be forwarded to a FIX session with a different BeginString value this parameter can be
        set to 'true'. Setting this parameter to 'true' will enforce this restriction.
       </li>
-
     </ul>
     <p/>
     <h2 id="script">