You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by hi...@apache.org on 2011/04/18 08:00:46 UTC
svn commit: r1094238 -
/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/
Author: hiranya
Date: Mon Apr 18 06:00:45 2011
New Revision: 1094238
URL: http://svn.apache.org/viewvc?rev=1094238&view=rev
Log:
Implementing SYNAPSE-668 and some minor bug fixes in the FIXOutgoingMessageHandler
Added:
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/SessionEventHandler.java
Modified:
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXConstants.java
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXIncomingMessageHandler.java
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXOutgoingMessageHandler.java
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXUtils.java
Modified: synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXConstants.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXConstants.java?rev=1094238&r1=1094237&r2=1094238&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXConstants.java (original)
+++ synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXConstants.java Mon Apr 18 06:00:45 2011
@@ -54,6 +54,7 @@ public class FIXConstants {
public static final long DEFAULT_HEART_BT_INT_VALUE = 30;
public static final String DEFAULT_START_TIME_VALUE = "00:00:00";
public static final String DEFAULT_END_TIME_VALUE = "00:00:00";
+ public static final int DEFAULT_COUNTER_UPPER_LIMIT = 1000000000;
public static final String HEART_BY_INT = "HeartBtInt";
public static final String BEGIN_STRING = "BeginString";
@@ -102,4 +103,11 @@ public class FIXConstants {
public static final String FIX_DROP_EXTRA_RESPONSES = "transport.fix.DropExtraResponses";
+ public static final String FIX_ACCEPTOR_EVENT_HANDLER = "transport.fix.AcceptorSessionEventHandler";
+ public static final String FIX_INITIATOR_EVENT_HANDLER = "transport.fix.InitiatorSessionEventHandler";
+
+ //--------------------------- Message level properties -----------------------------------
+
+ public static final String FIX_IGNORE_ORDER = "transport.fix.IgnoreOrder";
+
}
\ No newline at end of file
Modified: synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXIncomingMessageHandler.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXIncomingMessageHandler.java?rev=1094238&r1=1094237&r2=1094238&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXIncomingMessageHandler.java (original)
+++ synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXIncomingMessageHandler.java Mon Apr 18 06:00:45 2011
@@ -41,11 +41,12 @@ import quickfix.field.SenderCompID;
import quickfix.field.TargetCompID;
import javax.xml.namespace.QName;
-import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* FIXIncomingMessageHandler is responsible for handling all incoming FIX messages. This is where the
@@ -63,11 +64,12 @@ public class FIXIncomingMessageHandler i
/** A boolean value indicating the type of the FIX application */
private boolean acceptor;
/** A Map of counters with one counter per session */
- private Map<SessionID, Integer> countersMap;
+ private Map<SessionID, AtomicInteger> countersMap;
private Queue<MessageContext> outgoingMessages;
- private boolean allNewApproach;
- private boolean dropExtraResponses;
+ private boolean allNewApproach = true;
+ private boolean dropExtraResponses = false;
private Semaphore semaphore;
+ private SessionEventHandler eventHandler;
public FIXIncomingMessageHandler(ConfigurationContext cfgCtx, WorkerPool workerPool,
AxisService service, boolean acceptor) {
@@ -76,25 +78,43 @@ public class FIXIncomingMessageHandler i
this.service = service;
this.log = LogFactory.getLog(this.getClass());
this.acceptor = acceptor;
- countersMap = new HashMap<SessionID, Integer>();
+ countersMap = new ConcurrentHashMap<SessionID, AtomicInteger>();
outgoingMessages = new LinkedBlockingQueue<MessageContext>();
semaphore = new Semaphore(0);
getResponseHandlingApproach();
+
+ Parameter eventHandlerParam;
+ if (acceptor) {
+ eventHandlerParam = service.getParameter(FIXConstants.FIX_ACCEPTOR_EVENT_HANDLER);
+ } else {
+ eventHandlerParam = service.getParameter(FIXConstants.FIX_INITIATOR_EVENT_HANDLER);
+ }
+
+ if (eventHandlerParam != null && eventHandlerParam.getValue() != null &&
+ !"".equals(eventHandlerParam.getValue())) {
+ try {
+ Class clazz = getClass().getClassLoader().loadClass(
+ (String) eventHandlerParam.getValue());
+ eventHandler = (SessionEventHandler) clazz.newInstance();
+ } catch (ClassNotFoundException e) {
+ log.error("Unable to find the session event handler class: " +
+ eventHandlerParam.getValue(), e);
+ } catch (Exception e) {
+ log.error("Error while initializing the session event handler class: " +
+ eventHandlerParam.getValue(), e);
+ }
+ }
}
private void getResponseHandlingApproach() {
Parameter param = service.getParameter(FIXConstants.FIX_RESPONSE_HANDLER_APPROACH);
if (param != null && "false".equals(param.getValue().toString())) {
allNewApproach = false;
- } else {
- allNewApproach = true;
}
Parameter dropResponsesParam = service.getParameter(FIXConstants.FIX_DROP_EXTRA_RESPONSES);
if (dropResponsesParam != null && "true".equals(dropResponsesParam.getValue().toString())) {
dropExtraResponses = true;
- } else {
- dropExtraResponses = false;
}
}
@@ -119,12 +139,15 @@ public class FIXIncomingMessageHandler i
* Sessions exist whether or not a counter party is connected to it. As soon
* as a session is created, the application can begin sending messages to it. If no one
* is logged on, the messages will be sent at the time a connection is
- * established with the counterparty.
+ * established with the counter party.
*
* @param sessionID QuickFIX session ID
*/
public void onCreate(SessionID sessionID) {
log.info("New FIX session created: " + sessionID.toString());
+ if (eventHandler != null) {
+ eventHandler.onCreate(sessionID);
+ }
}
/**
@@ -136,9 +159,15 @@ public class FIXIncomingMessageHandler i
* @param sessionID QuickFIX session ID
*/
public void onLogon(SessionID sessionID) {
- countersMap.put(sessionID, 0);
+ if (!countersMap.containsKey(sessionID)) {
+ countersMap.put(sessionID, new AtomicInteger(0));
+ }
log.info("FIX session logged on: " + sessionID.toString());
semaphore.release();
+
+ if (eventHandler != null) {
+ eventHandler.onLogon(sessionID);
+ }
}
/**
@@ -149,11 +178,14 @@ public class FIXIncomingMessageHandler i
* @param sessionID QuickFIX session ID
*/
public void onLogout(SessionID sessionID) {
- countersMap.put(sessionID, 0);
FIXTransportSender trpSender = (FIXTransportSender) cfgCtx.getAxisConfiguration().
getTransportOut(FIXConstants.TRANSPORT_NAME).getSender();
trpSender.logOutIncomingSession(sessionID);
log.info("FIX session logged out: " + sessionID.toString());
+
+ if (eventHandler != null) {
+ eventHandler.onLogout(sessionID);
+ }
}
/**
@@ -182,6 +214,10 @@ public class FIXIncomingMessageHandler i
log.trace("Message: " + message.toString());
}
}
+
+ if (eventHandler != null) {
+ eventHandler.toAdmin(message, sessionID);
+ }
}
/**
@@ -209,11 +245,15 @@ public class FIXIncomingMessageHandler i
log.trace("Message: " + message.toString());
}
}
+
+ if (eventHandler != null) {
+ eventHandler.fromAdmin(message, sessionID);
+ }
}
/**
* This is a callback for application messages that are being sent to a
- * counterparty.
+ * counter party.
*
* @param message QuickFIX message
* @param sessionID QuickFIX session ID
@@ -236,6 +276,10 @@ public class FIXIncomingMessageHandler i
log.trace("Message: " + message.toString());
}
}
+
+ if (eventHandler != null) {
+ eventHandler.toApp(message, sessionID);
+ }
}
/**
@@ -264,10 +308,12 @@ public class FIXIncomingMessageHandler i
}
}
- int counter = countersMap.get(sessionID);
- counter++;
- countersMap.put(sessionID, counter);
-
+ AtomicInteger atomicCounter = countersMap.get(sessionID);
+ int counter = atomicCounter.incrementAndGet();
+ boolean rolled = atomicCounter.compareAndSet(FIXConstants.DEFAULT_COUNTER_UPPER_LIMIT, 0);
+ if (rolled && log.isDebugEnabled()) {
+ log.debug("Incoming request counter rolled over for the session: " + sessionID);
+ }
workerPool.execute(new FIXWorkerThread(message, sessionID, counter));
}
@@ -288,7 +334,12 @@ public class FIXIncomingMessageHandler i
}
private void handleIncomingRequest() {
- //Create message context for the incmong message
+ if (log.isDebugEnabled()) {
+ log.debug("Source session: " + sessionID + " - Received message with sequence " +
+ "number " + counter);
+ }
+
+ //Create message context for the incoming message
AbstractTransportListener trpListener = (AbstractTransportListener) cfgCtx.getAxisConfiguration().
getTransportIn(FIXConstants.TRANSPORT_NAME).getReceiver();
@@ -311,16 +362,12 @@ public class FIXIncomingMessageHandler i
msgCtx.setAxisMessage(operation.getMessage(WSDLConstants.MESSAGE_LABEL_IN_VALUE));
msgCtx.setSoapAction("urn:" + operation.getName().getLocalPart());
}
- } else {
- log.warn("Service information not available for the FIX message processor");
- return;
}
String fixApplication = FIXConstants.FIX_INITIATOR;
if (acceptor) {
fixApplication = FIXConstants.FIX_ACCEPTOR;
- }
- else {
+ } else {
msgCtx.setProperty("synapse.isresponse", true);
}
Modified: synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXOutgoingMessageHandler.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXOutgoingMessageHandler.java?rev=1094238&r1=1094237&r2=1094238&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXOutgoingMessageHandler.java (original)
+++ synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXOutgoingMessageHandler.java Mon Apr 18 06:00:45 2011
@@ -20,28 +20,32 @@
package org.apache.synapse.transport.fix;
import org.apache.axis2.context.MessageContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import quickfix.Message;
import quickfix.Session;
import quickfix.SessionID;
import quickfix.SessionNotFound;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
* FIXOutgoingMessageHandler makes sure that messages are delivered in the order they were received by
- * a FIX acceptor. In case the message arrived over a different transport srill this class will try to
+ * a FIX acceptor. In case the message arrived over a different transport still this class will try to
* put the messages in correct order based on the counter value of the message.
*/
public class FIXOutgoingMessageHandler {
+ private static final Log log = LogFactory.getLog(FIXOutgoingMessageHandler.class);
+
private Map<String, Integer> countersMap;
private Map<String, Map<Integer,Object[]>> messagesMap;
private FIXSessionFactory sessionFactory;
public FIXOutgoingMessageHandler() {
- countersMap = new HashMap<String, Integer>();
- messagesMap = new HashMap<String, Map<Integer,Object[]>>();
+ countersMap = new ConcurrentHashMap<String, Integer>();
+ messagesMap = new ConcurrentHashMap<String, Map<Integer,Object[]>>();
}
public void setSessionFactory(FIXSessionFactory sessionFactory) {
@@ -64,35 +68,43 @@ public class FIXOutgoingMessageHandler {
public synchronized void sendMessage(Message message, SessionID targetSession, String sourceSession,
int counter, MessageContext msgCtx, String targetEPR) throws SessionNotFound {
- if (sourceSession != null && counter != -1) {
+ boolean ignoreOrder = "true".equals(msgCtx.getProperty(FIXConstants.FIX_IGNORE_ORDER));
+ if (sourceSession != null && counter != -1 && !ignoreOrder) {
int expectedValue;
if (countersMap.containsKey(sourceSession)) {
expectedValue = countersMap.get(sourceSession);
- }
- else {
+ } else {
//create new entries in the respective Maps
//counter starts at 1
countersMap.put(sourceSession, 1);
- messagesMap.put(sourceSession, new HashMap<Integer,Object[]>());
+ messagesMap.put(sourceSession, new ConcurrentHashMap<Integer,Object[]>());
expectedValue = 1;
}
if (expectedValue == counter) {
sendToTarget(msgCtx, targetEPR, message, targetSession);
- countersMap.put(sourceSession, expectedValue++);
+ if (FIXConstants.DEFAULT_COUNTER_UPPER_LIMIT == expectedValue) {
+ if (log.isDebugEnabled()) {
+ log.debug("Outgoing request counter rolled over for the session: " +
+ sourceSession + " (from " + expectedValue + ")");
+ }
+ expectedValue = 1;
+ }
+ countersMap.put(sourceSession, ++expectedValue);
sendQueuedMessages(expectedValue, sourceSession);
}
else {
+ if (log.isDebugEnabled()) {
+ log.debug("Source session: " + sourceSession + " - Expected sequence number (" +
+ expectedValue + ") does not match with the actual sequence number (" +
+ counter + "). Holding the message back for later delivery.");
+ }
+
//save the message to be sent later...
Map<Integer,Object[]> messages = messagesMap.get(sourceSession);
- Object[] obj = new Object[4];
- obj[0] = message;
- obj[1] = targetSession;
- obj[2] = msgCtx;
- obj[3] = targetEPR;
+ Object[] obj = new Object[] { message, targetSession, msgCtx, targetEPR } ;
messages.put(counter, obj);
- messagesMap.put(sourceSession, messages);
}
}
else {
@@ -144,11 +156,23 @@ public class FIXOutgoingMessageHandler {
String targetEPR = null;
if (obj[2] != null) {
msgCtx = (MessageContext) obj[2];
- targetEPR = obj[3].toString();
+ targetEPR = (String) obj[3];
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Source session: " + session + " - Sending the previously queued message " +
+ "with the sequence number: " + expectedValue);
}
sendToTarget(msgCtx, targetEPR, message, sessionID);
messages.remove(expectedValue);
- obj = messages.get(expectedValue++);
+ if (FIXConstants.DEFAULT_COUNTER_UPPER_LIMIT == expectedValue) {
+ if (log.isDebugEnabled()) {
+ log.debug("Outgoing request counter rolled over for the session: " + session +
+ " (from " + expectedValue + ")");
+ }
+ expectedValue = 1;
+ }
+ obj = messages.get(++expectedValue);
}
messagesMap.put(session, messages);
countersMap.put(session, expectedValue);
@@ -163,6 +187,12 @@ public class FIXOutgoingMessageHandler {
if (obj != null) {
Message message = (Message) obj[0];
SessionID sessionID = (SessionID) obj[1];
+
+ if (log.isDebugEnabled()) {
+ log.debug("Source session: " + session + " - Flushing the previously queued " +
+ "message with the sequence number: " + expectedValue);
+ }
+
try {
Session.sendToTarget(message, sessionID);
} catch (SessionNotFound ignore) { }
Modified: synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java?rev=1094238&r1=1094237&r2=1094238&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java (original)
+++ synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java Mon Apr 18 06:00:45 2011
@@ -98,7 +98,7 @@ public class FIXSessionFactory {
* acceptorStore keyed by the service name and start it.
*
* @param service the AxisService
- * @return true if the acceptor was created and started properly and false otherwise
+ * @return true if the acceptor is successfully initialized and false otherwise
* @throws AxisFault if the acceptor cannot be created
*/
public boolean createFIXAcceptor(AxisService service) throws AxisFault {
@@ -175,7 +175,7 @@ public class FIXSessionFactory {
try {
settings = new SessionSettings(fixConfigStream);
} catch (ConfigError e) {
- throw new AxisFault("Error in the specified FIX configuration for the initiaotr. " +
+ throw new AxisFault("Error in the specified FIX configuration for the initiator. " +
"Unable to initialize a FIX session for the service " +
service.getName(), e);
}
@@ -349,7 +349,7 @@ public class FIXSessionFactory {
* @param serviceName the name of the AxisService
* @return a FIX Acceptor for the service
*/
- public Acceptor getAccepter(String serviceName) {
+ public Acceptor getAcceptor(String serviceName) {
return acceptorStore.get(serviceName);
}
@@ -361,7 +361,7 @@ public class FIXSessionFactory {
*/
public Initiator getInitiator(String fixEPR) {
return initiatorStore.get(fixEPR);
- }
+ }
/**
* Get the FIX configuration URL from the services.xml.
@@ -391,8 +391,8 @@ public class FIXSessionFactory {
} catch (IOException e) {
log.error("Error while reading from the URL " + fixConfigURLValue, e);
}
- } else if (log.isDebugEnabled()) {
- log.debug("FIX configuration URL is not specified for the service " + service.getName());
+ } else {
+ log.info("FIX configuration URL is not specified for the service " + service.getName());
}
return fixConfigStream;
@@ -506,8 +506,7 @@ public class FIXSessionFactory {
jmxExporter.setRegistrationBehavior(JmxExporter.REGISTRATION_IGNORE_EXISTING);
jmxExporter.export(connector);
} catch (JMException e) {
- log.error("Error while initializing JMX support for the FIX sessions in " +
- "service: " + service, e);
+ log.error("Error while initializing JMX support for the service: " + service, e);
}
}
}
Modified: synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java?rev=1094238&r1=1094237&r2=1094238&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java (original)
+++ synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java Mon Apr 18 06:00:45 2011
@@ -71,7 +71,7 @@ public class FIXTransportListener extend
*
* @param service the service for which to listen for messages
*/
- protected void startListeningForService(AxisService service) {
+ public void startListeningForService(AxisService service) {
try {
boolean acceptorCreated = fixSessionFactory.createFIXAcceptor(service);
boolean initiatorCreated = fixSessionFactory.createFIXInitiator(service);
Modified: synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java?rev=1094238&r1=1094237&r2=1094238&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java (original)
+++ synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java Mon Apr 18 06:00:45 2011
@@ -46,7 +46,7 @@ import java.util.Map;
* <p/>
* This transport sender implementation does not support forwarding FIX messages to sessions with
* different BeginString values.When it performs a message forwarding it makes sure the forwarding
- * takes place according to the conditions specified in the 'Thirs Party Routing' section in the
+ * takes place according to the conditions specified in the 'Third Party Routing' section in the
* FIX protocol specification.
*/
public class FIXTransportSender extends AbstractTransportSender {
@@ -108,7 +108,7 @@ public class FIXTransportSender extends
try {
fixMessage = FIXUtils.getInstance().createFIXMessage(msgCtx);
} catch (IOException e) {
- handleException("Exception occured while creating the FIX message from SOAP Envelope", e);
+ handleException("Exception occurred while creating the FIX message from SOAP Envelope", e);
}
if (FIXConstants.FIX_ACCEPTOR.equals(fixApplication)) {
@@ -118,11 +118,12 @@ public class FIXTransportSender extends
sendUsingEPR(targetEPR, serviceName, fixMessage, sourceSession, counter, msgCtx);
} else if (outTransportInfo != null && outTransportInfo instanceof FIXOutTransportInfo) {
//Send the message back to the sender
- sendUsingTrpOutInfo(outTransportInfo, serviceName, fixMessage, sourceSession, counter);
+ sendUsingTrpOutInfo(outTransportInfo, serviceName, fixMessage,
+ sourceSession, counter, msgCtx);
}
} else if (FIXConstants.FIX_INITIATOR.equals(fixApplication)) {
- if (sendUsingAcceptorSession(serviceName, fixMessage, sourceSession, counter)) {
+ if (sendUsingAcceptorSession(serviceName, fixMessage, sourceSession, counter, msgCtx)) {
return;
} else if (targetEPR != null) {
sendUsingEPR(targetEPR, serviceName, fixMessage, sourceSession, counter, msgCtx);
@@ -135,12 +136,12 @@ public class FIXTransportSender extends
if (targetEPR != null) {
sendUsingEPR(targetEPR, serviceName, fixMessage, sourceSession, counter, msgCtx);
} else {
- sendUsingAcceptorSession(serviceName, fixMessage, sourceSession, counter);
+ sendUsingAcceptorSession(serviceName, fixMessage, sourceSession, counter, msgCtx);
}
}
}
- private boolean isTargetVald(Map<String, String> fieldValues, SessionID targetSession,
+ private boolean isTargetValid(Map<String, String> fieldValues, SessionID targetSession,
boolean beginStrValidation) {
String beginString = fieldValues.get(FIXConstants.BEGIN_STRING);
@@ -239,7 +240,7 @@ public class FIXTransportSender extends
/**
* Puts DeliverToX fields in the message to enable the message to be forwarded at the destination.
- * This method retireves the parameters from the services.xml and put them in the message as
+ * This method retrieves the parameters from the services.xml and put them in the message as
* DeliverToX fields. Should be used when a response message has to forwarded at the destination.
*
* @param message the FIX message to be forwarded
@@ -322,12 +323,14 @@ public class FIXTransportSender extends
* @param fixMessage the FIX message to be sent
* @param srcSession String uniquely identifying the incoming session
* @param counter application level sequence number of the message
- * @param serviceName name of the AxisSerivce for the message
+ * @param serviceName name of the AxisService for the message
+ * @param msgCtx Axis2 MessageContext
* @return boolean value indicating the result
* @throws AxisFault on error
*/
private boolean sendUsingTrpOutInfo(OutTransportInfo trpOutInfo, String serviceName,
- Message fixMessage, String srcSession, int counter) throws AxisFault {
+ Message fixMessage, String srcSession, int counter,
+ MessageContext msgCtx) throws AxisFault {
FIXOutTransportInfo fixOut = (FIXOutTransportInfo) trpOutInfo;
SessionID sessionID = fixOut.getSessionID();
@@ -356,7 +359,7 @@ public class FIXTransportSender extends
}
try {
- messageSender.sendMessage(fixMessage, sessionID, srcSession, counter, null, null);
+ messageSender.sendMessage(fixMessage, sessionID, srcSession, counter, msgCtx, null);
return true;
} catch (SessionNotFound e) {
log.error("Error while sending the FIX message. Session " + sessionID.toString() + " does" +
@@ -366,22 +369,23 @@ public class FIXTransportSender extends
}
/**
- * Send the message using a session in the aaceptor side
+ * Send the message using a session in the acceptor side
*
* @param serviceName the service of the message
* @param fixMessage the FIX message to be sent
* @param srcSession String uniquely identifying the incoming session
* @param counter the application level sequence number of the message
+ * @param msgCtx Axi2 MessageContext
* @return boolean value indicating the result
* @throws AxisFault on error
*/
private boolean sendUsingAcceptorSession(String serviceName, Message fixMessage, String srcSession,
- int counter) throws AxisFault {
+ int counter, MessageContext msgCtx) throws AxisFault {
Map<String, String> fieldValues = FIXUtils.getMessageForwardingParameters(fixMessage);
String deliverToCompID = fieldValues.get(FIXConstants.DELIVER_TO_COMP_ID);
- Acceptor acceptor = sessionFactory.getAccepter(serviceName);
+ Acceptor acceptor = sessionFactory.getAcceptor(serviceName);
SessionID sessionID = null;
AxisService service = cfgCtx.getAxisConfiguration().getService(serviceName);
@@ -390,14 +394,14 @@ public class FIXTransportSender extends
ArrayList<SessionID> sessions = acceptor.getSessions();
if (sessions.size() == 1) {
sessionID = sessions.get(0);
- if (deliverToCompID != null && !isTargetVald(fieldValues, sessionID, isValidationOn(service))) {
+ if (deliverToCompID != null && !isTargetValid(fieldValues, sessionID, isValidationOn(service))) {
sessionID = null;
}
} else if (sessions.size() > 1 && deliverToCompID != null) {
- for (int i = 0; i < sessions.size(); i++) {
- sessionID = sessions.get(i);
- if (isTargetVald(fieldValues, sessionID, isValidationOn(service))) {
+ for (SessionID session : sessions) {
+ sessionID = session;
+ if (isTargetValid(fieldValues, sessionID, isValidationOn(service))) {
break;
}
}
@@ -407,7 +411,8 @@ public class FIXTransportSender extends
if (sessionID != null) {
//Found a valid session. Now forward the message...
FIXOutTransportInfo fixOutInfo = new FIXOutTransportInfo(sessionID);
- return sendUsingTrpOutInfo(fixOutInfo, serviceName, fixMessage, srcSession, counter);
+ return sendUsingTrpOutInfo(fixOutInfo, serviceName, fixMessage,
+ srcSession, counter, msgCtx);
}
return false;
}
Modified: synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXUtils.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXUtils.java?rev=1094238&r1=1094237&r2=1094238&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXUtils.java (original)
+++ synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXUtils.java Mon Apr 18 06:00:45 2011
@@ -213,7 +213,7 @@ public class FIXUtils {
List<Group> groupList = message.getGroups(groupKey);
Iterator<Group> groupIterator = groupList.iterator();
- while(groupIterator.hasNext()) {
+ while (groupIterator.hasNext()) {
Group msgGroup = groupIterator.next();
OMElement groupField = soapFactory.createOMElement(FIXConstants.FIX_GROUP, null);
// rec. call the method to process the repeating groups
Added: synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/SessionEventHandler.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/SessionEventHandler.java?rev=1094238&view=auto
==============================================================================
--- synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/SessionEventHandler.java (added)
+++ synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/SessionEventHandler.java Mon Apr 18 06:00:45 2011
@@ -0,0 +1,36 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.synapse.transport.fix;
+
+public interface SessionEventHandler {
+
+ void onCreate(quickfix.SessionID sessionID);
+
+ void onLogon(quickfix.SessionID sessionID);
+
+ void onLogout(quickfix.SessionID sessionID);
+
+ void toAdmin(quickfix.Message message, quickfix.SessionID sessionID);
+
+ void fromAdmin(quickfix.Message message, quickfix.SessionID sessionID);
+
+ void toApp(quickfix.Message message, quickfix.SessionID sessionID);
+
+}