You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ha...@apache.org on 2011/06/02 17:26:13 UTC
svn commit: r1130604 - in /camel/trunk/components/camel-quickfix: ./
src/main/java/org/apache/camel/component/quickfixj/
src/test/java/org/apache/camel/component/quickfixj/
src/test/java/org/apache/camel/component/quickfixj/examples/
src/test/java/org/...
Author: hadrian
Date: Thu Jun 2 15:26:12 2011
New Revision: 1130604
URL: http://svn.apache.org/viewvc?rev=1130604&view=rev
Log:
CAMEL-4038. Original contributed patch applied with thanks to Steve. Checkstyle fixes to follow
Added:
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessageCorrelator.java
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessagePredicate.java
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/RequestReplyExample.java
Modified:
camel/trunk/components/camel-quickfix/pom.xml
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConvertersTest.java
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutorExample.java
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjMessageJsonTransformer.java
camel/trunk/components/camel-quickfix/src/test/resources/examples/inprocess.cfg
Modified: camel/trunk/components/camel-quickfix/pom.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/pom.xml?rev=1130604&r1=1130603&r2=1130604&view=diff
==============================================================================
--- camel/trunk/components/camel-quickfix/pom.xml (original)
+++ camel/trunk/components/camel-quickfix/pom.xml Thu Jun 2 15:26:12 2011
@@ -67,12 +67,16 @@
<artifactId>slf4j-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-jetty</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
-
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
Added: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessageCorrelator.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessageCorrelator.java?rev=1130604&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessageCorrelator.java (added)
+++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessageCorrelator.java Thu Jun 2 15:26:12 2011
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeTimedOutException;
+
+import quickfix.Message;
+import quickfix.SessionID;
+
+public class MessageCorrelator implements QuickfixjEventListener {
+ public static final long DEFAULT_CORRELATION_TIMEOUT = 1000L;
+ private final CopyOnWriteArrayList<MessageCorrelationRule> rules = new CopyOnWriteArrayList<MessageCorrelationRule>();
+
+ public Callable<Message> getReply(SessionID sessionID, Exchange exchange)
+ throws InterruptedException, ExchangeTimedOutException {
+
+ MessagePredicate messageCriteria = (MessagePredicate) exchange
+ .getProperty(QuickfixjProducer.CORRELATION_CRITERIA_KEY);
+ final MessageCorrelationRule correlationRule = new MessageCorrelationRule(
+ exchange, sessionID, messageCriteria);
+
+ final long timeout = exchange.getProperty(
+ QuickfixjProducer.CORRELATION_TIMEOUT_KEY,
+ DEFAULT_CORRELATION_TIMEOUT, Long.class);
+
+ rules.add(correlationRule);
+
+ return new Callable<Message>() {
+ @Override
+ public Message call() throws Exception {
+ if (!correlationRule.getLatch().await(timeout,
+ TimeUnit.MILLISECONDS)) {
+ throw new ExchangeTimedOutException(
+ correlationRule.getExchange(), timeout);
+ }
+ return correlationRule.getReplyMessage();
+ }
+ };
+ }
+
+ @Override
+ public void onEvent(QuickfixjEventCategory eventCategory,
+ SessionID sessionID, Message message) throws Exception {
+ if (message != null) {
+ for (MessageCorrelationRule rule : rules) {
+ if (rule.getMessageCriteria().evaluate(message)) {
+ rule.setReplyMessage(message);
+ rules.remove(rule);
+ rule.getLatch().countDown();
+ }
+ }
+ }
+ }
+
+ private class MessageCorrelationRule {
+ private final Exchange exchange;
+ private final CountDownLatch latch = new CountDownLatch(1);
+ private final MessagePredicate messageCriteria;
+
+ private Message replyMessage;
+
+ public MessageCorrelationRule(Exchange exchange, SessionID sessionID,
+ MessagePredicate messageCriteria) {
+ this.exchange = exchange;
+ this.messageCriteria = messageCriteria;
+ }
+
+ public void setReplyMessage(Message message) {
+ this.replyMessage = message;
+ }
+
+ public Message getReplyMessage() {
+ return replyMessage;
+ }
+
+ public CountDownLatch getLatch() {
+ return latch;
+ }
+
+ public Exchange getExchange() {
+ return exchange;
+ }
+
+ public MessagePredicate getMessageCriteria() {
+ return messageCriteria;
+ }
+ }
+
+}
Added: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessagePredicate.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessagePredicate.java?rev=1130604&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessagePredicate.java (added)
+++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessagePredicate.java Thu Jun 2 15:26:12 2011
@@ -0,0 +1,68 @@
+package org.apache.camel.component.quickfixj;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import quickfix.Field;
+import quickfix.FieldMap;
+import quickfix.FieldNotFound;
+import quickfix.Message;
+import quickfix.SessionID;
+import quickfix.field.MsgType;
+import quickfix.field.SenderCompID;
+import quickfix.field.TargetCompID;
+
+public class MessagePredicate {
+ private final List<Field<String>> headerCriteria = new ArrayList<Field<String>>();
+ private final List<Field<String>> bodyCriteria = new ArrayList<Field<String>>();
+
+ public MessagePredicate(SessionID requestingSessionID, String msgType) {
+ // Reverse session ID for reply
+ // TODO may need to optionally include subID and locationID
+ addHeaderFieldIfPresent(SenderCompID.FIELD, requestingSessionID.getTargetCompID());
+ addHeaderFieldIfPresent(TargetCompID.FIELD, requestingSessionID.getSenderCompID());
+ withMessageType(msgType);
+ }
+
+ private void addHeaderFieldIfPresent(int tag, String value) {
+ if (value != null && !"".equals(value)) {
+ withHeaderField(tag, value);
+ }
+ }
+
+ public boolean evaluate(Message message) {
+ return evaluate(message, bodyCriteria) && evaluate(message.getHeader(), headerCriteria);
+ }
+
+ private boolean evaluate(FieldMap fieldMap, List<Field<String>> criteria) {
+ for (Field<String> c : criteria) {
+ String value = null;
+ try {
+ if (fieldMap.isSetField(c.getField())) {
+ value = fieldMap.getString(c.getField());
+ }
+ } catch (FieldNotFound e) {
+ // ignored, shouldn't happen
+ }
+ if (!c.getObject().equals(value)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public MessagePredicate withField(int tag, String value) {
+ bodyCriteria.add(new Field<String>(tag, value));
+ return this;
+ }
+
+ public MessagePredicate withHeaderField(int tag, String value) {
+ headerCriteria.add(new Field<String>(tag, value));
+ return this;
+ }
+
+ private MessagePredicate withMessageType(String msgType) {
+ headerCriteria.add(new Field<String>(MsgType.FIELD, msgType));
+ return this;
+ }
+}
Modified: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java?rev=1130604&r1=1130603&r2=1130604&view=diff
==============================================================================
--- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java (original)
+++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java Thu Jun 2 15:26:12 2011
@@ -22,6 +22,7 @@ import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+
import javax.management.JMException;
import org.apache.camel.Endpoint;
@@ -29,6 +30,7 @@ import org.apache.camel.impl.DefaultComp
import org.apache.camel.util.UnsafeUriCharactersEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import quickfix.ConfigError;
import quickfix.FieldConvertError;
import quickfix.LogFactory;
@@ -72,7 +74,7 @@ public class QuickfixjComponent extends
}
}
- endpoint = new QuickfixjEndpoint(uri, getCamelContext());
+ endpoint = new QuickfixjEndpoint(engine, uri, getCamelContext());
engine.addEventListener(endpoint);
endpoints.put(uri, endpoint);
}
Modified: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java?rev=1130604&r1=1130603&r2=1130604&view=diff
==============================================================================
--- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java (original)
+++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java Thu Jun 2 15:26:12 2011
@@ -18,12 +18,17 @@ package org.apache.camel.component.quick
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
+import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
+import quickfix.MessageUtils;
+import quickfix.Session;
+import quickfix.SessionID;
+
public class QuickfixjConsumer extends DefaultConsumer {
- public QuickfixjConsumer(Endpoint endpoint, Processor processor) {
+ public QuickfixjConsumer(Endpoint endpoint, Processor processor) {
super(endpoint, processor);
}
@@ -31,9 +36,38 @@ public class QuickfixjConsumer extends D
if (isStarted()) {
try {
getProcessor().process(exchange);
+ if (exchange.getPattern().isOutCapable() && exchange.hasOut()) {
+ sendOutMessage(exchange);
+ }
} catch (Exception e) {
exchange.setException(e);
}
}
}
+
+ private void sendOutMessage(Exchange exchange) {
+ try {
+ Message camelMessage = exchange.getOut();
+ quickfix.Message quickfixjMessage = camelMessage.getBody(quickfix.Message.class);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Sending FIX message reply: " + quickfixjMessage.toString());
+ }
+
+ SessionID messageSessionID = MessageUtils.getReverseSessionID(exchange.getIn().getBody(quickfix.Message.class));
+
+ Session session = getSession(messageSessionID);
+ if (session == null) {
+ throw new IllegalStateException("Unknown session: " + messageSessionID);
+ }
+
+ session.send(quickfixjMessage);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+ }
+
+ Session getSession(SessionID messageSessionID) {
+ return Session.lookupSession(messageSessionID);
+ }
}
Modified: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java?rev=1130604&r1=1130603&r2=1130604&view=diff
==============================================================================
--- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java (original)
+++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java Thu Jun 2 15:26:12 2011
@@ -25,73 +25,124 @@ import org.apache.camel.Exchange;
import org.apache.camel.MultipleConsumersSupport;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
+import org.apache.camel.ResolveEndpointFailedException;
import org.apache.camel.component.quickfixj.converter.QuickfixjConverters;
import org.apache.camel.impl.DefaultEndpoint;
-import org.apache.camel.util.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import quickfix.Message;
import quickfix.SessionID;
-public class QuickfixjEndpoint extends DefaultEndpoint implements QuickfixjEventListener, MultipleConsumersSupport {
- public static final String EVENT_CATEGORY_KEY = "EventCategory";
- public static final String SESSION_ID_KEY = "SessionID";
- public static final String MESSAGE_TYPE_KEY = "MessageType";
- public static final String DATA_DICTIONARY_KEY = "DataDictionary";
-
- private static final Logger LOG = LoggerFactory.getLogger(QuickfixjEndpoint.class);
-
- private SessionID sessionID;
- private final List<QuickfixjConsumer> consumers = new CopyOnWriteArrayList<QuickfixjConsumer>();
-
- public QuickfixjEndpoint(String uri, CamelContext context) {
- super(uri, context);
- }
-
- protected SessionID getSessionID() {
- return sessionID;
- }
-
- public void setSessionID(SessionID sessionID) {
- this.sessionID = sessionID;
- }
-
- public Consumer createConsumer(Processor processor) throws Exception {
- LOG.info("Creating QuickFIX/J consumer: " + (sessionID != null ? sessionID : "No Session"));
- QuickfixjConsumer consumer = new QuickfixjConsumer(this, processor);
- consumers.add(consumer);
- return consumer;
- }
-
- public Producer createProducer() throws Exception {
- LOG.info("Creating QuickFIX/J producer: " + (sessionID != null ? sessionID : "No Session"));
- return new QuickfixjProducer(this);
- }
-
- public boolean isSingleton() {
- return true;
- }
-
- public void onEvent(QuickfixjEventCategory eventCategory, SessionID sessionID, Message message) throws Exception {
- if (this.sessionID == null || this.sessionID.equals(sessionID)) {
- for (QuickfixjConsumer consumer : consumers) {
- Exchange exchange = QuickfixjConverters.toExchange(this, sessionID, message, eventCategory);
- consumer.onExchange(exchange);
- if (exchange.getException() != null) {
- throw exchange.getException();
- }
- }
- }
- }
-
- public boolean isMultipleConsumersSupported() {
- return true;
- }
-
- @Override
- protected void doStop() throws Exception {
- // clear list of consumers
- consumers.clear();
- }
+public class QuickfixjEndpoint extends DefaultEndpoint implements
+ QuickfixjEventListener, MultipleConsumersSupport {
+ public static final String EVENT_CATEGORY_KEY = "EventCategory";
+ public static final String SESSION_ID_KEY = "SessionID";
+ public static final String MESSAGE_TYPE_KEY = "MessageType";
+ public static final String DATA_DICTIONARY_KEY = "DataDictionary";
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(QuickfixjEndpoint.class);
+
+ private SessionID sessionID;
+ private final List<QuickfixjConsumer> consumers = new CopyOnWriteArrayList<QuickfixjConsumer>();
+ private final QuickfixjEngine engine;
+
+ public QuickfixjEndpoint(QuickfixjEngine engine, String uri, CamelContext context) {
+ super(uri, context);
+ this.engine = engine;
+ }
+
+ public SessionID getSessionID() {
+ return sessionID;
+ }
+
+ public void setSessionID(SessionID sessionID) {
+ this.sessionID = sessionID;
+ }
+
+ public Consumer createConsumer(Processor processor) throws Exception {
+ LOG.info("Creating QuickFIX/J consumer: "
+ + (sessionID != null ? sessionID : "No Session")
+ + ", ExchangePattern=" + getExchangePattern());
+ QuickfixjConsumer consumer = new QuickfixjConsumer(this, processor);
+ consumers.add(consumer);
+ return consumer;
+ }
+
+ public Producer createProducer() throws Exception {
+ LOG.info("Creating QuickFIX/J producer: "
+ + (sessionID != null ? sessionID : "No Session"));
+ if (isWildcarded()) {
+ throw new ResolveEndpointFailedException(
+ "Cannot create consumer on wildcarded session identifier: "
+ + sessionID);
+ }
+ return new QuickfixjProducer(this);
+ }
+
+ public boolean isSingleton() {
+ return true;
+ }
+
+ public void onEvent(QuickfixjEventCategory eventCategory,
+ SessionID sessionID, Message message) throws Exception {
+ if (this.sessionID == null || isMatching(sessionID)) {
+ for (QuickfixjConsumer consumer : consumers) {
+ Exchange exchange = QuickfixjConverters.toExchange(this,
+ sessionID, message, eventCategory);
+ consumer.onExchange(exchange);
+ if (exchange.getException() != null) {
+ throw exchange.getException();
+ }
+ }
+ }
+ }
+
+ private boolean isMatching(SessionID sessionID) {
+ return this.sessionID.equals(sessionID)
+ || (isMatching(this.sessionID.getBeginString(),
+ sessionID.getBeginString())
+ && isMatching(this.sessionID.getSenderCompID(),
+ sessionID.getSenderCompID())
+ && isMatching(this.sessionID.getSenderSubID(),
+ sessionID.getSenderSubID())
+ && isMatching(this.sessionID.getSenderLocationID(),
+ sessionID.getSenderLocationID())
+ && isMatching(this.sessionID.getTargetCompID(),
+ sessionID.getTargetCompID())
+ && isMatching(this.sessionID.getTargetSubID(),
+ sessionID.getTargetSubID()) && isMatching(
+ this.sessionID.getTargetLocationID(),
+ sessionID.getTargetLocationID()));
+ }
+
+ private boolean isMatching(String s1, String s2) {
+ return s1.equals("") || s1.equals("*") || s1.equals(s2);
+ }
+
+ private boolean isWildcarded() {
+ return sessionID != null
+ && (sessionID.getBeginString().equals("*")
+ || sessionID.getSenderCompID().equals("*")
+ || sessionID.getSenderSubID().equals("*")
+ || sessionID.getSenderLocationID().equals("*")
+ || sessionID.getTargetCompID().equals("*")
+ || sessionID.getTargetSubID().equals("*") || sessionID
+ .getTargetLocationID().equals("*"));
+ }
+
+ public boolean isMultipleConsumersSupported() {
+ return true;
+ }
+
+ public QuickfixjEngine getEngine() {
+ return engine;
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ // clear list of consumers
+ consumers.clear();
+ }
}
Modified: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java?rev=1130604&r1=1130603&r2=1130604&view=diff
==============================================================================
--- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java (original)
+++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java Thu Jun 2 15:26:12 2011
@@ -92,7 +92,8 @@ public class QuickfixjEngine {
private final MessageStoreFactory messageStoreFactory;
private final LogFactory sessionLogFactory;
private final MessageFactory messageFactory;
-
+ private final MessageCorrelator messageCorrelator = new MessageCorrelator();
+
private boolean started;
private List<QuickfixjEventListener> eventListeners = new CopyOnWriteArrayList<QuickfixjEventListener>();
@@ -119,6 +120,8 @@ public class QuickfixjEngine {
MessageStoreFactory messageStoreFactoryOverride, LogFactory sessionLogFactoryOverride,
MessageFactory messageFactoryOverride) throws ConfigError, FieldConvertError, IOException, JMException {
+ addEventListener(messageCorrelator);
+
this.uri = uri;
this.forcedShutdown = forcedShutdown;
@@ -463,6 +466,10 @@ public class QuickfixjEngine {
return uri;
}
+ public MessageCorrelator getMessageCorrelator() {
+ return messageCorrelator;
+ }
+
// For Testing
Initiator getInitiator() {
return initiator;
Modified: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java?rev=1130604&r1=1130603&r2=1130604&view=diff
==============================================================================
--- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java (original)
+++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java Thu Jun 2 15:26:12 2011
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.quickfixj;
+import java.util.concurrent.Callable;
+
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultProducer;
@@ -26,31 +28,61 @@ import quickfix.Session;
import quickfix.SessionID;
public class QuickfixjProducer extends DefaultProducer {
- private final SessionID sessionID;
-
+ public static final String CORRELATION_TIMEOUT_KEY = "CorrelationTimeout";
+ public static final String CORRELATION_CRITERIA_KEY = "CorrelationCriteria";
+
+ private final SessionID sessionID;
+
public QuickfixjProducer(Endpoint endpoint) {
super(endpoint);
sessionID = ((QuickfixjEndpoint) getEndpoint()).getSessionID();
}
public void process(Exchange exchange) throws Exception {
- Message message = exchange.getIn().getBody(Message.class);
- if (log.isDebugEnabled()) {
- log.debug("Sending FIX message: " + message.toString());
- }
-
- SessionID messageSessionID = sessionID;
- if (messageSessionID == null) {
- messageSessionID = MessageUtils.getSessionID(message);
- }
-
- Session session = Session.lookupSession(messageSessionID);
- if (session == null) {
- exchange.setException(new IllegalStateException("Unknown session: " + messageSessionID));
- return;
- }
-
- session.send(message);
+ sendMessage(exchange, exchange.getIn());
}
+ void sendMessage(Exchange exchange, org.apache.camel.Message camelMessage)
+ throws InterruptedException {
+ try {
+ Message message = camelMessage.getBody(Message.class);
+ if (log.isDebugEnabled()) {
+ log.debug("Sending FIX message: " + message.toString());
+ }
+
+ SessionID messageSessionID = sessionID;
+ if (messageSessionID == null) {
+ messageSessionID = MessageUtils.getSessionID(message);
+ }
+
+ Session session = getSession(messageSessionID);
+ if (session == null) {
+ throw new IllegalStateException("Unknown session: "
+ + messageSessionID);
+ }
+
+ Callable<Message> callable = null;
+
+ if (exchange.getPattern().isOutCapable()) {
+ QuickfixjEndpoint endpoint = (QuickfixjEndpoint) getEndpoint();
+ MessageCorrelator messageCorrelator = endpoint.getEngine()
+ .getMessageCorrelator();
+ callable = messageCorrelator.getReply(
+ endpoint.getSessionID(), exchange);
+ }
+
+ session.send(message);
+
+ if (callable != null) {
+ Message reply = callable.call();
+ exchange.getOut().setBody(reply);
+ }
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+ }
+
+ Session getSession(SessionID messageSessionID) {
+ return Session.lookupSession(messageSessionID);
+ }
}
Modified: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java?rev=1130604&r1=1130603&r2=1130604&view=diff
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java (original)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java Thu Jun 2 15:26:12 2011
@@ -28,21 +28,41 @@ import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
+import quickfix.FixVersions;
+import quickfix.Message;
+import quickfix.MessageUtils;
+import quickfix.Session;
+import quickfix.SessionID;
+import quickfix.field.BeginString;
+import quickfix.field.SenderCompID;
+import quickfix.field.TargetCompID;
+
public class QuickfixjConsumerTest {
private Exchange mockExchange;
private Processor mockProcessor;
private Endpoint mockEndpoint;
-
+ private SessionID sessionID;
+ private Message inboundFixMessage;
+
@Before
public void setUp() {
+
mockExchange = Mockito.mock(Exchange.class);
org.apache.camel.Message mockCamelMessage = Mockito.mock(org.apache.camel.Message.class);
Mockito.when(mockExchange.getIn()).thenReturn(mockCamelMessage);
+ inboundFixMessage = new Message();
+ inboundFixMessage.getHeader().setString(BeginString.FIELD, FixVersions.BEGINSTRING_FIX44);
+ inboundFixMessage.getHeader().setString(SenderCompID.FIELD, "SENDER");
+ inboundFixMessage.getHeader().setString(TargetCompID.FIELD, "TARGET");
+ sessionID = MessageUtils.getSessionID(inboundFixMessage);
+
+ Mockito.when(mockCamelMessage.getBody(quickfix.Message.class)).thenReturn(inboundFixMessage);
+
mockProcessor = Mockito.mock(Processor.class);
mockEndpoint = Mockito.mock(Endpoint.class);
- Mockito.when(mockEndpoint.createExchange(ExchangePattern.InOnly)).thenReturn(mockExchange);
+ Mockito.when(mockEndpoint.createExchange(ExchangePattern.InOnly)).thenReturn(mockExchange);
}
@Test
@@ -67,8 +87,7 @@ public class QuickfixjConsumerTest {
}
@Test
- public void setExceptionOnExchange() throws Exception {
-
+ public void setExceptionOnExchange() throws Exception {
QuickfixjConsumer consumer = new QuickfixjConsumer(mockEndpoint, mockProcessor);
consumer.start();
@@ -79,5 +98,45 @@ public class QuickfixjConsumerTest {
consumer.onExchange(mockExchange);
Mockito.verify(mockExchange).setException(exception);
- }
+ }
+
+ @Test
+ public void setExceptionOnInOutExchange() throws Exception {
+ org.apache.camel.Message mockCamelOutMessage = Mockito.mock(org.apache.camel.Message.class);
+ Mockito.when(mockExchange.getPattern()).thenReturn(ExchangePattern.InOut);
+ Mockito.when(mockExchange.hasOut()).thenReturn(true);
+ Mockito.when(mockExchange.getOut()).thenReturn(mockCamelOutMessage);
+ Mockito.when(mockCamelOutMessage.getBody(Message.class)).thenReturn(new Message());
+
+ QuickfixjConsumer consumer = new QuickfixjConsumer(mockEndpoint, mockProcessor);
+ consumer.start();
+
+ // Simulate a message from the FIX engine
+ consumer.onExchange(mockExchange);
+
+ Mockito.verify(mockExchange).setException(Mockito.isA(IllegalStateException.class));
+ }
+
+ @Test
+ public void processInOutExchange() throws Exception {
+ org.apache.camel.Message mockCamelOutMessage = Mockito.mock(org.apache.camel.Message.class);
+ Mockito.when(mockExchange.hasOut()).thenReturn(true);
+ Mockito.when(mockExchange.getOut()).thenReturn(mockCamelOutMessage);
+ Message outboundFixMessage = new Message();
+ Mockito.when(mockCamelOutMessage.getBody(Message.class)).thenReturn(outboundFixMessage);
+
+ QuickfixjConsumer consumer = Mockito.spy(new QuickfixjConsumer(mockEndpoint, mockProcessor));
+ Session mockSession = Mockito.spy(TestSupport.createSession(sessionID));
+ Mockito.doReturn(mockSession).when(consumer).getSession(MessageUtils.getReverseSessionID(inboundFixMessage));
+ Mockito.doReturn(true).when(mockSession).send(Mockito.isA(Message.class));
+
+ consumer.start();
+
+ Mockito.when(mockExchange.getPattern()).thenReturn(ExchangePattern.InOut);
+
+ consumer.onExchange(mockExchange);
+
+ Mockito.verify(mockExchange, Mockito.never()).setException(Mockito.isA(Exception.class));
+ Mockito.verify(mockSession).send(outboundFixMessage);
+ }
}
Modified: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConvertersTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConvertersTest.java?rev=1130604&r1=1130603&r2=1130604&view=diff
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConvertersTest.java (original)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConvertersTest.java Thu Jun 2 15:26:12 2011
@@ -98,7 +98,7 @@ public class QuickfixjConvertersTest {
@Test
public void convertToExchange() {
SessionID sessionID = new SessionID("FIX.4.0", "FOO", "BAR");
- QuickfixjEndpoint endpoint = new QuickfixjEndpoint("", camelContext);
+ QuickfixjEndpoint endpoint = new QuickfixjEndpoint(null, "", camelContext);
Message message = new Message();
message.getHeader().setString(MsgType.FIELD, MsgType.ORDER_SINGLE);
@@ -116,7 +116,7 @@ public class QuickfixjConvertersTest {
@Test
public void convertToExchangeWithNullMessage() {
SessionID sessionID = new SessionID("FIX.4.0", "FOO", "BAR");
- QuickfixjEndpoint endpoint = new QuickfixjEndpoint("", camelContext);
+ QuickfixjEndpoint endpoint = new QuickfixjEndpoint(null, "", camelContext);
Exchange exchange = QuickfixjConverters.toExchange(endpoint, sessionID, null, QuickfixjEventCategory.AppMessageSent);
Modified: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java?rev=1130604&r1=1130603&r2=1130604&view=diff
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java (original)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java Thu Jun 2 15:26:12 2011
@@ -16,33 +16,133 @@
*/
package org.apache.camel.component.quickfixj;
+import java.io.IOException;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import javax.management.JMException;
+
import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import quickfix.ConfigError;
+import quickfix.FieldConvertError;
import quickfix.FixVersions;
import quickfix.Message;
+import quickfix.MessageUtils;
+import quickfix.Session;
import quickfix.SessionID;
+import quickfix.field.BeginString;
+import quickfix.field.MsgType;
+import quickfix.field.SenderCompID;
+import quickfix.field.TargetCompID;
+import quickfix.fix42.Email;
public class QuickfixjProducerTest {
-
+ private Exchange mockExchange;
+ private QuickfixjEndpoint mockEndpoint;
+ private org.apache.camel.Message mockCamelMessage;
+ private QuickfixjProducer producer;
+ private SessionID sessionID;
+ private Message inboundFixMessage;
+ private QuickfixjEngine quickfixjEngine;
+
+ @Before
+ public void setUp() throws ConfigError, FieldConvertError, IOException, JMException {
+ mockExchange = Mockito.mock(Exchange.class);
+ mockEndpoint = Mockito.mock(QuickfixjEndpoint.class);
+ mockCamelMessage = Mockito.mock(org.apache.camel.Message.class);
+ Mockito.when(mockExchange.getIn()).thenReturn(mockCamelMessage);
+ Mockito.when(mockExchange.getPattern()).thenReturn(ExchangePattern.InOnly);
+
+ quickfixjEngine = TestSupport.createEngine();
+ Mockito.when(mockEndpoint.getEngine()).thenReturn(quickfixjEngine);
+
+ inboundFixMessage = new Message();
+ inboundFixMessage.getHeader().setString(BeginString.FIELD, FixVersions.BEGINSTRING_FIX44);
+ inboundFixMessage.getHeader().setString(SenderCompID.FIELD, "SENDER");
+ inboundFixMessage.getHeader().setString(TargetCompID.FIELD, "TARGET");
+ sessionID = MessageUtils.getSessionID(inboundFixMessage);
+
+ Mockito.when(mockCamelMessage.getBody(Message.class)).thenReturn(inboundFixMessage);
+
+ Mockito.when(mockEndpoint.getSessionID()).thenReturn(sessionID);
+
+ producer = Mockito.spy(new QuickfixjProducer(mockEndpoint));
+ }
+
+ @SuppressWarnings("serial")
+ public class TestException extends RuntimeException {
+
+ }
+
@Test
public void setExceptionOnExchange() throws Exception {
- Exchange mockExchange = Mockito.mock(Exchange.class);
+ Session mockSession = Mockito.spy(TestSupport.createSession(sessionID));
+ Mockito.doReturn(mockSession).when(producer).getSession(MessageUtils.getSessionID(inboundFixMessage));
+ Mockito.doThrow(new TestException()).when(mockSession).send(Mockito.isA(Message.class));
- QuickfixjEndpoint mockEndpoint = Mockito.mock(QuickfixjEndpoint.class);
- org.apache.camel.Message mockCamelMessage = Mockito.mock(org.apache.camel.Message.class);
- Mockito.when(mockExchange.getIn()).thenReturn(mockCamelMessage);
- Mockito.when(mockCamelMessage.getBody(Message.class)).thenReturn(new Message());
+ producer.process(mockExchange);
+ Mockito.verify(mockExchange).setException(Matchers.isA(TestException.class));
+ }
+
+ @Test
+ public void processInOnlyExchange() throws Exception {
+ Session mockSession = Mockito.spy(TestSupport.createSession(sessionID));
+ Mockito.doReturn(mockSession).when(producer).getSession(MessageUtils.getSessionID(inboundFixMessage));
+ Mockito.doReturn(true).when(mockSession).send(Mockito.isA(Message.class));
+
+ producer.process(mockExchange);
+
+ Mockito.verify(mockExchange, Mockito.never()).setException(Matchers.isA(IllegalStateException.class));
+ Mockito.verify(mockSession).send(inboundFixMessage);
+ }
- SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "SENDER", "TARGET");
- Mockito.when(mockEndpoint.getSessionID()).thenReturn(sessionID);
+ @Test
+ public void processInOutExchange() throws Exception {
+ Mockito.when(mockExchange.getPattern()).thenReturn(ExchangePattern.InOut);
+ Mockito.when(mockExchange.getProperty(QuickfixjProducer.CORRELATION_CRITERIA_KEY)).
+ thenReturn(new MessagePredicate(sessionID, MsgType.EMAIL));
+ Mockito.when(mockExchange.getProperty(
+ QuickfixjProducer.CORRELATION_TIMEOUT_KEY,
+ 1000L, Long.class)).thenReturn(5000L);
+
+ org.apache.camel.Message mockOutboundCamelMessage = Mockito.mock(org.apache.camel.Message.class);
+ Mockito.when(mockExchange.getOut()).thenReturn(mockOutboundCamelMessage);
- QuickfixjProducer producer = new QuickfixjProducer(mockEndpoint);
+ final Message outboundFixMessage = new Email();
+ outboundFixMessage.getHeader().setString(SenderCompID.FIELD, "TARGET");
+ outboundFixMessage.getHeader().setString(TargetCompID.FIELD, "SENDER");
+ Session mockSession = Mockito.spy(TestSupport.createSession(sessionID));
+ Mockito.doReturn(mockSession).when(producer).getSession(MessageUtils.getSessionID(inboundFixMessage));
+ Mockito.doAnswer(new Answer<Boolean>() {
+ @Override
+ public Boolean answer(InvocationOnMock invocation) throws Throwable {
+ new Timer().schedule(new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ quickfixjEngine.getMessageCorrelator().onEvent(QuickfixjEventCategory.AppMessageReceived, sessionID, outboundFixMessage);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }, 10);
+ return true;
+ }
+ }).when(mockSession).send(Mockito.isA(Message.class));
+
producer.process(mockExchange);
- Mockito.verify(mockExchange).setException(Matchers.isA(IllegalStateException.class));
+ Mockito.verify(mockExchange, Mockito.never()).setException(Matchers.isA(IllegalStateException.class));
+ Mockito.verify(mockSession).send(inboundFixMessage);
+ Mockito.verify(mockOutboundCamelMessage).setBody(outboundFixMessage);
}
}
Modified: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java?rev=1130604&r1=1130603&r2=1130604&view=diff
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java (original)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java Thu Jun 2 15:26:12 2011
@@ -19,7 +19,23 @@ package org.apache.camel.component.quick
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.util.Date;
+import javax.management.JMException;
+
+import org.mockito.Mockito;
+
+import quickfix.Acceptor;
+import quickfix.Application;
+import quickfix.ConfigError;
+import quickfix.DefaultSessionFactory;
+import quickfix.FieldConvertError;
+import quickfix.LogFactory;
+import quickfix.MessageFactory;
+import quickfix.MessageStore;
+import quickfix.MessageStoreFactory;
+import quickfix.Session;
+import quickfix.SessionFactory;
import quickfix.SessionID;
import quickfix.SessionSettings;
import quickfix.field.EmailThreadID;
@@ -55,4 +71,51 @@ public final class TestSupport {
email.addGroup(text);
return email;
}
+
+ public static Session createSession(SessionID sessionID) throws ConfigError, IOException {
+ MessageStoreFactory mockMessageStoreFactory = Mockito.mock(MessageStoreFactory.class);
+ MessageStore mockMessageStore = Mockito.mock(MessageStore.class);
+ Mockito.when(mockMessageStore.getCreationTime()).thenReturn(new Date());
+
+ Mockito.when(mockMessageStoreFactory.create(sessionID)).thenReturn(mockMessageStore);
+
+ DefaultSessionFactory factory = new DefaultSessionFactory(
+ Mockito.mock(Application.class),
+ mockMessageStoreFactory,
+ Mockito.mock(LogFactory.class));
+
+ SessionSettings settings = new SessionSettings();
+ settings.setLong(Session.SETTING_HEARTBTINT, 10);
+ settings.setString(Session.SETTING_START_TIME, "00:00:00");
+ settings.setString(Session.SETTING_END_TIME, "00:00:00");
+ settings.setString(SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.ACCEPTOR_CONNECTION_TYPE);
+ settings.setBool(Session.SETTING_USE_DATA_DICTIONARY, false);
+
+ return factory.create(sessionID, settings);
+
+ }
+
+ public static QuickfixjEngine createEngine() throws ConfigError, FieldConvertError, IOException, JMException {
+ SessionID sessionID = new SessionID("FIX.4.4:SENDER->TARGET");
+
+ MessageStoreFactory mockMessageStoreFactory = Mockito.mock(MessageStoreFactory.class);
+ MessageStore mockMessageStore = Mockito.mock(MessageStore.class);
+ Mockito.when(mockMessageStore.getCreationTime()).thenReturn(new Date());
+ Mockito.when(mockMessageStoreFactory.create(sessionID)).thenReturn(mockMessageStore);
+
+ SessionSettings settings = new SessionSettings();
+
+ settings.setLong(sessionID, Session.SETTING_HEARTBTINT, 10);
+ settings.setString(sessionID, Session.SETTING_START_TIME, "00:00:00");
+ settings.setString(sessionID, Session.SETTING_END_TIME, "00:00:00");
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.ACCEPTOR_CONNECTION_TYPE);
+ settings.setLong(sessionID, Acceptor.SETTING_SOCKET_ACCEPT_PORT, 8000);
+ settings.setBool(sessionID, Session.SETTING_USE_DATA_DICTIONARY, false);
+
+ return new QuickfixjEngine("", settings, false,
+ mockMessageStoreFactory,
+ Mockito.mock(LogFactory.class),
+ Mockito.mock(MessageFactory.class));
+
+ }
}
Added: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/RequestReplyExample.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/RequestReplyExample.java?rev=1130604&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/RequestReplyExample.java (added)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/RequestReplyExample.java Thu Jun 2 15:26:12 2011
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj.examples;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Body;
+import org.apache.camel.Exchange;
+import org.apache.camel.Header;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.quickfixj.MessagePredicate;
+import org.apache.camel.component.quickfixj.QuickfixjComponent;
+import org.apache.camel.component.quickfixj.QuickfixjEndpoint;
+import org.apache.camel.component.quickfixj.QuickfixjEventCategory;
+import org.apache.camel.component.quickfixj.QuickfixjProducer;
+import org.apache.camel.component.quickfixj.examples.transform.QuickfixjMessageJsonTransformer;
+import org.apache.camel.component.quickfixj.examples.util.CountDownLatchDecrementer;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.eclipse.jetty.util.log.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import quickfix.FieldNotFound;
+import quickfix.SessionID;
+import quickfix.field.AvgPx;
+import quickfix.field.ClOrdID;
+import quickfix.field.CumQty;
+import quickfix.field.ExecID;
+import quickfix.field.ExecTransType;
+import quickfix.field.ExecType;
+import quickfix.field.LeavesQty;
+import quickfix.field.MsgType;
+import quickfix.field.OrdStatus;
+import quickfix.field.OrderID;
+import quickfix.field.Side;
+import quickfix.field.Symbol;
+import quickfix.fix42.ExecutionReport;
+import quickfix.fix42.OrderStatusRequest;
+
+public class RequestReplyExample {
+ private static final Logger LOG = LoggerFactory.getLogger(QuickfixjComponent.class);
+
+ public static void main(String[] args) throws Exception {
+ new RequestReplyExample().run();
+ }
+
+ public void run() throws Exception {
+ DefaultCamelContext context = new DefaultCamelContext();
+ final CountDownLatch logonLatch = new CountDownLatch(1);
+ final String orderStatusServiceUrl = "http://localhost:9123/order/status";
+
+ RouteBuilder routes = new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // Synchronize the logon so we don't start sending status requests too early
+ from("quickfix:examples/inprocess.cfg?sessionID=FIX.4.2:TRADER->MARKET").
+ filter(header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.SessionLogon)).
+ bean(new CountDownLatchDecrementer("logon", logonLatch));
+
+ // Incoming status requests are converted to InOut exchange pattern and passed to the
+ // order status service. The response is sent back to the session making the request.
+ from("quickfix:examples/inprocess.cfg?sessionID=FIX.4.2:MARKET->TRADER&exchangePattern=InOut").
+ filter(header(QuickfixjEndpoint.MESSAGE_TYPE_KEY).isEqualTo(MsgType.ORDER_STATUS_REQUEST)).
+ bean(new MarketOrderStatusService());
+
+ from ("jetty:" + orderStatusServiceUrl).
+ bean(new OrderStatusRequestTransformer()).
+ routingSlip(bean(FixSessionRouter.class, "route")).
+ bean(new QuickfixjMessageJsonTransformer());
+ }
+ };
+
+ context.addRoutes(routes);
+
+ LOG.info("Starting Camel context");
+ context.start();
+
+ if (!logonLatch.await(5L, TimeUnit.SECONDS)) {
+ throw new IllegalStateException("Logon did not succeed");
+ }
+
+ // Send a request to the order status web service.
+ // Verify that the response is a JSON response.
+
+ URL orderStatusUrl = new URL(orderStatusServiceUrl + "?sessionID=FIX.4.2:TRADER->MARKET&orderID=abc");
+ HttpURLConnection connection = (HttpURLConnection) orderStatusUrl.openConnection();
+ BufferedReader orderStatusReply = new BufferedReader(new InputStreamReader(connection.getInputStream()));
+ String line = orderStatusReply.readLine();
+ if (!line.equals("\"message\": {")) {
+ throw new Exception("Don't appear to be a JSON response");
+ }
+ else {
+ StringBuilder sb = new StringBuilder();
+ while (line != null) {
+ sb.append(line);
+ sb.append('\n');
+ line = orderStatusReply.readLine();
+ }
+ Log.info("Web request response:\n" + sb);
+
+ }
+ orderStatusReply.close();
+
+ LOG.info("Shutting down Camel context");
+ context.stop();
+
+ LOG.info("Example complete");
+ }
+
+ public static class OrderStatusRequestTransformer {
+ public void transform(Exchange exchange) throws FieldNotFound {
+ String sessionID = (String) exchange.getIn().getHeader("sessionID");
+ String orderID = (String) exchange.getIn().getHeader("orderID");
+
+ OrderStatusRequest request = new OrderStatusRequest(new ClOrdID("XYZ"), new Symbol("GOOG"), new Side(Side.BUY));
+ request.set(new OrderID(orderID));
+
+ // Look for a reply execution report back to the requester session
+ // and having the requested OrderID. This is a loose correlation but the best
+ // we can do with FIX 4.2. Newer versions of FIX have an optional explicit correlation field.
+ exchange.setProperty(QuickfixjProducer.CORRELATION_CRITERIA_KEY,
+ new MessagePredicate(new SessionID(sessionID), MsgType.EXECUTION_REPORT)
+ .withField(OrderID.FIELD, request.getString(OrderID.FIELD)));
+
+ exchange.getIn().setBody(request);
+ }
+ }
+
+ public static class MarketOrderStatusService {
+ private static final Logger LOG = LoggerFactory.getLogger(QuickfixjComponent.class);
+
+ public ExecutionReport getOrderStatus(OrderStatusRequest request) throws FieldNotFound {
+ LOG.info("Received order status request for orderId=" + request.getOrderID().getValue());
+ return new ExecutionReport(
+ request.getOrderID(), new ExecID(UUID.randomUUID().toString()),
+ new ExecTransType(ExecTransType.STATUS),
+ new ExecType(ExecType.REJECTED),
+ new OrdStatus(OrdStatus.REJECTED),
+ new Symbol("GOOG"),
+ new Side(Side.BUY),
+ new LeavesQty(100),
+ new CumQty(0),
+ new AvgPx(0));
+ }
+ }
+
+ public static class FixSessionRouter {
+ public String route(@Header("sessionID") String sessionID, @Body Object body) {
+ return String.format("quickfix:examples/inprocess.cfg?sessionID=%s", sessionID);
+ }
+ }
+}
Modified: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutorExample.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutorExample.java?rev=1130604&r1=1130603&r2=1130604&view=diff
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutorExample.java (original)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutorExample.java Thu Jun 2 15:26:12 2011
@@ -63,7 +63,6 @@ public class TradeExecutorExample {
@Override
public void configure() throws Exception {
// Release latch when session logon events are received
- // We expect four logon events (four sessions)
from("quickfix:examples/inprocess.cfg").
filter(header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.SessionLogon)).
bean(new CountDownLatchDecrementer("logon", logonLatch));
Modified: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjMessageJsonTransformer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjMessageJsonTransformer.java?rev=1130604&r1=1130603&r2=1130604&view=diff
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjMessageJsonTransformer.java (original)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjMessageJsonTransformer.java Thu Jun 2 15:26:12 2011
@@ -18,16 +18,32 @@ package org.apache.camel.component.quick
import java.util.Iterator;
+import quickfix.ConfigError;
import quickfix.DataDictionary;
import quickfix.Field;
import quickfix.FieldMap;
+import quickfix.FieldNotFound;
import quickfix.FieldType;
import quickfix.Group;
import quickfix.Message;
-
+import quickfix.MessageUtils;
+import quickfix.Session;
+import quickfix.SessionID;
public class QuickfixjMessageJsonTransformer {
-
+
+ public String transform(Message message) throws FieldNotFound, ConfigError {
+ SessionID sessionID = MessageUtils.getSessionID(message);
+ Session session = Session.lookupSession(sessionID);
+ DataDictionary dataDictionary = session.getDataDictionary();
+
+ if (dataDictionary == null) {
+ throw new IllegalStateException("No Data Dictionary. Exchange must reference an existing session");
+ }
+
+ return transform(message, dataDictionary);
+ }
+
public String transform(Message message, DataDictionary dataDictionary) {
return transform(message, "", dataDictionary);
}
Modified: camel/trunk/components/camel-quickfix/src/test/resources/examples/inprocess.cfg
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/resources/examples/inprocess.cfg?rev=1130604&r1=1130603&r2=1130604&view=diff
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/resources/examples/inprocess.cfg (original)
+++ camel/trunk/components/camel-quickfix/src/test/resources/examples/inprocess.cfg Thu Jun 2 15:26:12 2011
@@ -23,6 +23,7 @@
UseJmx=Y
SocketAcceptProtocol=VM_PIPE
SocketConnectProtocol=VM_PIPE
+HeartBtInt=120
#
# Initiator for simulating a trader