You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by hi...@apache.org on 2010/05/17 11:37:15 UTC
svn commit: r945047 - in
/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix:
FIXSessionFactory.java FIXTransportListener.java
Author: hiranya
Date: Mon May 17 09:37:15 2010
New Revision: 945047
URL: http://svn.apache.org/viewvc?rev=945047&view=rev
Log:
Fixing SYNAPSE-649
Now a FIX proxy can be exposed purely in the initiator mode. A service can be an acceptor, an initiator or both.
Modified:
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
Modified: synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java?rev=945047&r1=945046&r2=945047&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java (original)
+++ synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java Mon May 17 09:37:15 2010
@@ -97,9 +97,10 @@ public class FIXSessionFactory {
* acceptorStore keyed by the service name and start it.
*
* @param service the AxisService
+ * @return true if the acceptor was created and started properly and false otherwise
* @throws AxisFault if the acceptor cannot be created
*/
- public void createFIXAcceptor(AxisService service) throws AxisFault {
+ public boolean createFIXAcceptor(AxisService service) throws AxisFault {
//Try to get an InputStream to the FIX configuration file
InputStream fixConfigStream = getFIXConfigAsStream(service, true);
@@ -107,7 +108,8 @@ public class FIXSessionFactory {
if (fixConfigStream != null) {
try {
if (log.isDebugEnabled()) {
- log.debug ("Initializing a new FIX session for the service " + service.getName());
+ log.debug ("Initializing a new FIX session for the service " +
+ service.getName());
}
SessionSettings settings = new SessionSettings(fixConfigStream);
@@ -115,7 +117,8 @@ public class FIXSessionFactory {
MessageFactory messageFactory = new DefaultMessageFactory();
quickfix.LogFactory logFactory = getLogFactory(service, settings, true);
//Get a new FIX Application
- Application messageHandler = applicationFactory.getFIXApplication(service, listenerThreadPool, true);
+ Application messageHandler = applicationFactory.getFIXApplication(service,
+ listenerThreadPool, true);
//Create a new FIX Acceptor
Acceptor acceptor = new SocketAcceptor(
messageHandler,
@@ -126,18 +129,17 @@ public class FIXSessionFactory {
acceptorStore.put(service.getName(),acceptor);
acceptor.start();
- return;
+ return true;
} catch (ConfigError e) {
String msg = "Error in the specified FIX configuration. Unable to initialize a " +
"FIX session for the service " + service.getName();
log.error(msg, e);
throw new AxisFault(msg, e);
}
+
+ } else {
+ return false;
}
- String msg = "Unable to initialize a FIX acceptor session for the service "
- + service.getName();
- log.error(msg);
- throw new AxisFault(msg);
}
/**
@@ -150,7 +152,8 @@ public class FIXSessionFactory {
* @param sessionID the SessionID of the session created
* @throws org.apache.axis2.AxisFault Exception thrown
*/
- public void createFIXInitiator(String fixEPR, AxisService service, SessionID sessionID) throws AxisFault {
+ public void createFIXInitiator(String fixEPR, AxisService service,
+ SessionID sessionID) throws AxisFault {
if (log.isDebugEnabled()) {
log.debug("Initializing a new FIX initiator for the service " + service.getName());
@@ -160,15 +163,19 @@ public class FIXSessionFactory {
if (fixConfigStream == null) {
settings = new SessionSettings();
- settings.setLong(sessionID, FIXConstants.HEART_BY_INT, FIXConstants.DEFAULT_HEART_BT_INT_VALUE);
- settings.setString(sessionID, FIXConstants.START_TIME, FIXConstants.DEFAULT_START_TIME_VALUE);
- settings.setString(sessionID, FIXConstants.END_TIME, FIXConstants.DEFAULT_END_TIME_VALUE);
+ settings.setLong(sessionID, FIXConstants.HEART_BY_INT,
+ FIXConstants.DEFAULT_HEART_BT_INT_VALUE);
+ settings.setString(sessionID, FIXConstants.START_TIME,
+ FIXConstants.DEFAULT_START_TIME_VALUE);
+ settings.setString(sessionID, FIXConstants.END_TIME,
+ FIXConstants.DEFAULT_END_TIME_VALUE);
} else {
try {
settings = new SessionSettings(fixConfigStream);
} catch (ConfigError e) {
- throw new AxisFault("Error in the specified FIX configuration for the initiaotr. Unable " +
- "to initialize a FIX session for the service " + service.getName(), e);
+ throw new AxisFault("Error in the specified FIX configuration for the initiaotr. " +
+ "Unable to initialize a FIX session for the service " +
+ service.getName(), e);
}
}
@@ -188,7 +195,8 @@ public class FIXSessionFactory {
MessageStoreFactory storeFactory = getMessageStoreFactory(service, settings, false);
MessageFactory messageFactory = new DefaultMessageFactory();
//Get a new FIX application
- Application messageHandler = applicationFactory.getFIXApplication(service, senderThreadPool, false);
+ Application messageHandler = applicationFactory.getFIXApplication(service,
+ senderThreadPool, false);
try {
//Create a new FIX initiator
@@ -213,7 +221,7 @@ public class FIXSessionFactory {
} catch (InterruptedException ignore) { }
}
- public void createFIXInitiator(AxisService service) throws AxisFault {
+ public boolean createFIXInitiator(AxisService service) throws AxisFault {
InputStream fixConfigStream = getFIXConfigAsStream(service, false);
if (fixConfigStream != null) {
@@ -230,7 +238,8 @@ public class FIXSessionFactory {
MessageFactory messageFactory = new DefaultMessageFactory();
quickfix.LogFactory logFactory = getLogFactory(service, settings, false);
//Get a new FIX Application
- Application messageHandler = applicationFactory.getFIXApplication(service, senderThreadPool, false);
+ Application messageHandler = applicationFactory.getFIXApplication(service,
+ senderThreadPool, false);
Initiator initiator = new SocketInitiator(
messageHandler,
@@ -245,6 +254,8 @@ public class FIXSessionFactory {
applicationStore.put(EPR, messageHandler);
}
initiator.start();
+ return true;
+
} catch (FieldConvertError e) {
String msg = "FIX configuration file for the initiator session of the service " +
service.getName() + " is either incomplete or invalid." +
@@ -265,6 +276,8 @@ public class FIXSessionFactory {
log.info("The " + FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " +
"not specified. Unable to initialize the initiator session at this stage.");
}
+
+ return false;
}
@@ -376,8 +389,8 @@ public class FIXSessionFactory {
} catch (IOException e) {
log.error("Error while reading from the URL " + fixConfigURLValue, e);
}
- } else {
- log.info("FIX configuration URL is not specified for the service " + service.getName());
+ } else if (log.isDebugEnabled()) {
+ log.debug("FIX configuration URL is not specified for the service " + service.getName());
}
return fixConfigStream;
Modified: synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java?rev=945047&r1=945046&r2=945047&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java (original)
+++ synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java Mon May 17 09:37:15 2010
@@ -73,8 +73,15 @@ public class FIXTransportListener extend
*/
protected void startListeningForService(AxisService service) {
try {
- fixSessionFactory.createFIXAcceptor(service);
- fixSessionFactory.createFIXInitiator(service);
+ boolean acceptorCreated = fixSessionFactory.createFIXAcceptor(service);
+ boolean initiatorCreated = fixSessionFactory.createFIXInitiator(service);
+
+ if (!acceptorCreated && !initiatorCreated) {
+ log.warn("No acceptor or initiator has been configured for the " +
+ "service " + service.getName() + " - Disabling the FIX transport for " +
+ "this service");
+ disableTransportForService(service);
+ }
} catch (AxisFault axisFault) {
disableTransportForService(service);
}