You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by hi...@apache.org on 2010/05/17 11:37:15 UTC

svn commit: r945047 - in /synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix: FIXSessionFactory.java FIXTransportListener.java

Author: hiranya
Date: Mon May 17 09:37:15 2010
New Revision: 945047

URL: http://svn.apache.org/viewvc?rev=945047&view=rev
Log:
Fixing SYNAPSE-649

Now a FIX proxy can be exposed purely in the initiator mode. A service can be an acceptor, an initiator or both.


Modified:
    synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
    synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java

Modified: synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java?rev=945047&r1=945046&r2=945047&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java (original)
+++ synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java Mon May 17 09:37:15 2010
@@ -97,9 +97,10 @@ public class FIXSessionFactory {
      * acceptorStore keyed by the service name and start it.
      *
      * @param service the AxisService
+     * @return true if the acceptor was created and started properly and false otherwise
      * @throws AxisFault if the acceptor cannot be created
      */
-    public void createFIXAcceptor(AxisService service) throws AxisFault {
+    public boolean createFIXAcceptor(AxisService service) throws AxisFault {
 
         //Try to get an InputStream to the FIX configuration file
         InputStream fixConfigStream = getFIXConfigAsStream(service, true);
@@ -107,7 +108,8 @@ public class FIXSessionFactory {
         if (fixConfigStream != null) {
             try {
                 if (log.isDebugEnabled()) {
-                    log.debug ("Initializing a new FIX session for the service " + service.getName());
+                    log.debug ("Initializing a new FIX session for the service " +
+                            service.getName());
                 }
 
                 SessionSettings settings = new SessionSettings(fixConfigStream);
@@ -115,7 +117,8 @@ public class FIXSessionFactory {
                 MessageFactory messageFactory = new DefaultMessageFactory();
                 quickfix.LogFactory logFactory = getLogFactory(service, settings, true);
                 //Get a new FIX Application
-                Application messageHandler = applicationFactory.getFIXApplication(service, listenerThreadPool, true);
+                Application messageHandler = applicationFactory.getFIXApplication(service,
+                        listenerThreadPool, true);
                 //Create a new FIX Acceptor
                 Acceptor acceptor = new SocketAcceptor(
                         messageHandler,
@@ -126,18 +129,17 @@ public class FIXSessionFactory {
 
                 acceptorStore.put(service.getName(),acceptor);
                 acceptor.start();
-                return;
+                return true;
             } catch (ConfigError e) {
                 String msg = "Error in the specified FIX configuration. Unable to initialize a " +
                         "FIX session for the service " + service.getName();
                 log.error(msg, e);
                 throw new AxisFault(msg, e);
             }
+
+        } else {
+            return false;
         }
-        String msg = "Unable to initialize a FIX acceptor session for the service "
-                + service.getName();
-        log.error(msg);
-        throw new AxisFault(msg);
     }
 
     /**
@@ -150,7 +152,8 @@ public class FIXSessionFactory {
      * @param sessionID the SessionID of the session created
      * @throws org.apache.axis2.AxisFault Exception thrown
      */
-    public void createFIXInitiator(String fixEPR, AxisService service, SessionID sessionID) throws AxisFault {
+    public void createFIXInitiator(String fixEPR, AxisService service,
+                                   SessionID sessionID) throws AxisFault {
 
         if (log.isDebugEnabled()) {
             log.debug("Initializing a new FIX initiator for the service " + service.getName());
@@ -160,15 +163,19 @@ public class FIXSessionFactory {
 
         if (fixConfigStream == null) {
             settings = new SessionSettings();
-            settings.setLong(sessionID, FIXConstants.HEART_BY_INT, FIXConstants.DEFAULT_HEART_BT_INT_VALUE);
-            settings.setString(sessionID, FIXConstants.START_TIME, FIXConstants.DEFAULT_START_TIME_VALUE);
-            settings.setString(sessionID, FIXConstants.END_TIME, FIXConstants.DEFAULT_END_TIME_VALUE);
+            settings.setLong(sessionID, FIXConstants.HEART_BY_INT,
+                    FIXConstants.DEFAULT_HEART_BT_INT_VALUE);
+            settings.setString(sessionID, FIXConstants.START_TIME,
+                    FIXConstants.DEFAULT_START_TIME_VALUE);
+            settings.setString(sessionID, FIXConstants.END_TIME,
+                    FIXConstants.DEFAULT_END_TIME_VALUE);
         } else {
             try {
                 settings = new SessionSettings(fixConfigStream);
             } catch (ConfigError e) {
-                throw new AxisFault("Error in the specified FIX configuration for the initiaotr. Unable " +
-                        "to initialize a FIX session for the service " + service.getName(), e);
+                throw new AxisFault("Error in the specified FIX configuration for the initiaotr. " +
+                        "Unable to initialize a FIX session for the service " +
+                        service.getName(), e);
             }
         }
 
@@ -188,7 +195,8 @@ public class FIXSessionFactory {
         MessageStoreFactory storeFactory = getMessageStoreFactory(service, settings, false);
         MessageFactory messageFactory = new DefaultMessageFactory();
         //Get a new FIX application
-        Application messageHandler = applicationFactory.getFIXApplication(service, senderThreadPool, false);
+        Application messageHandler = applicationFactory.getFIXApplication(service,
+                senderThreadPool, false);
 
         try {
            //Create a new FIX initiator
@@ -213,7 +221,7 @@ public class FIXSessionFactory {
         } catch (InterruptedException ignore) { }
     }
 
-    public void createFIXInitiator(AxisService service) throws AxisFault {
+    public boolean createFIXInitiator(AxisService service) throws AxisFault {
 
         InputStream fixConfigStream = getFIXConfigAsStream(service, false);
         if (fixConfigStream != null) {
@@ -230,7 +238,8 @@ public class FIXSessionFactory {
                 MessageFactory messageFactory = new DefaultMessageFactory();
                 quickfix.LogFactory logFactory = getLogFactory(service, settings, false);
                 //Get a new FIX Application
-                Application messageHandler = applicationFactory.getFIXApplication(service, senderThreadPool, false);
+                Application messageHandler = applicationFactory.getFIXApplication(service,
+                        senderThreadPool, false);
 
                 Initiator initiator = new SocketInitiator(
                     messageHandler,
@@ -245,6 +254,8 @@ public class FIXSessionFactory {
                     applicationStore.put(EPR, messageHandler);
                 }
                 initiator.start();
+                return true;
+
             } catch (FieldConvertError e) {
                 String msg = "FIX configuration file for the initiator session of the service " +
                         service.getName() + " is either incomplete or invalid." +
@@ -265,6 +276,8 @@ public class FIXSessionFactory {
             log.info("The " + FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " +
                     "not specified. Unable to initialize the initiator session at this stage.");
         }
+
+        return false;
     }
 
 
@@ -376,8 +389,8 @@ public class FIXSessionFactory {
             } catch (IOException e) {
                 log.error("Error while reading from the URL " + fixConfigURLValue, e);
             }
-        } else {
-            log.info("FIX configuration URL is not specified for the service " + service.getName());
+        } else if (log.isDebugEnabled()) {
+            log.debug("FIX configuration URL is not specified for the service " + service.getName());
         }
 
         return fixConfigStream;

Modified: synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java?rev=945047&r1=945046&r2=945047&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java (original)
+++ synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java Mon May 17 09:37:15 2010
@@ -73,8 +73,15 @@ public class FIXTransportListener extend
      */
     protected void startListeningForService(AxisService service) {
         try {
-            fixSessionFactory.createFIXAcceptor(service);
-            fixSessionFactory.createFIXInitiator(service);
+            boolean acceptorCreated = fixSessionFactory.createFIXAcceptor(service);
+            boolean initiatorCreated = fixSessionFactory.createFIXInitiator(service);
+
+            if (!acceptorCreated && !initiatorCreated) {
+                log.warn("No acceptor or initiator has been configured for the " +
+                        "service " + service.getName() + " - Disabling the FIX transport for " +
+                        "this service");
+                disableTransportForService(service);
+            }
         } catch (AxisFault axisFault) {
             disableTransportForService(service);
         }