You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ha...@apache.org on 2010/09/28 23:27:00 UTC
svn commit: r1002361 [2/3] - in /camel/trunk/components/camel-quickfix: ./
src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/
src/main/java/org/apache/camel/ src/main/java/org/apache/camel/component/
src/main/java/org/apache/cam...
Added: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConvertersTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConvertersTest.java?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConvertersTest.java (added)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConvertersTest.java Tue Sep 28 21:26:58 2010
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import javax.management.JMException;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.quickfixj.converter.QuickfixjConverters;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.mina.common.TransportType;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import quickfix.Acceptor;
+import quickfix.ConfigError;
+import quickfix.DataDictionary;
+import quickfix.FieldConvertError;
+import quickfix.Initiator;
+import quickfix.Message;
+import quickfix.SessionFactory;
+import quickfix.SessionID;
+import quickfix.SessionSettings;
+import quickfix.field.HopCompID;
+import quickfix.field.MsgType;
+import quickfix.fix44.Message.Header.NoHops;
+
+
+public class QuickfixjConvertersTest {
+ private static DefaultCamelContext camelContext;
+
+ private File settingsFile;
+ private ClassLoader contextClassLoader;
+ private SessionSettings settings;
+ private File tempdir;
+
+ private QuickfixjEngine quickfixjEngine;
+
+ @BeforeClass
+ public static void classSetUp() throws Exception {
+ camelContext = new DefaultCamelContext();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ settingsFile = File.createTempFile("quickfixj_test_", ".cfg");
+ tempdir = settingsFile.getParentFile();
+ URL[] urls = new URL[] {tempdir.toURI().toURL()};
+
+ contextClassLoader = Thread.currentThread().getContextClassLoader();
+ ClassLoader testClassLoader = new URLClassLoader(urls, contextClassLoader);
+ Thread.currentThread().setContextClassLoader(testClassLoader);
+
+ settings = new SessionSettings();
+ settings.setString(Acceptor.SETTING_SOCKET_ACCEPT_PROTOCOL, TransportType.VM_PIPE.toString());
+ settings.setString(Initiator.SETTING_SOCKET_CONNECT_PROTOCOL, TransportType.VM_PIPE.toString());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
+
+ @Test
+ public void convertSessionID() {
+ Object value = camelContext.getTypeConverter().convertTo(SessionID.class, "FIX.4.0:FOO->BAR");
+
+ assertThat(value, instanceOf(SessionID.class));
+ assertThat((SessionID)value, is(new SessionID("FIX.4.0", "FOO", "BAR")));
+ }
+
+ @Test
+ public void convertToExchange() {
+ SessionID sessionID = new SessionID("FIX.4.0", "FOO", "BAR");
+ QuickfixjEndpoint endpoint = new QuickfixjEndpoint("", camelContext);
+
+ Message message = new Message();
+ message.getHeader().setString(MsgType.FIELD, MsgType.ORDER_SINGLE);
+
+ Exchange exchange = QuickfixjConverters.toExchange(endpoint, sessionID, message, QuickfixjEventCategory.AppMessageSent);
+
+ assertThat((SessionID)exchange.getIn().getHeader(QuickfixjEndpoint.SESSION_ID_KEY), is(sessionID));
+
+ assertThat((QuickfixjEventCategory)exchange.getIn().getHeader(QuickfixjEndpoint.EVENT_CATEGORY_KEY),
+ is(QuickfixjEventCategory.AppMessageSent));
+
+ assertThat((String)exchange.getIn().getHeader(QuickfixjEndpoint.MESSAGE_TYPE_KEY), is(MsgType.ORDER_SINGLE));
+ }
+
+ @Test
+ public void convertToExchangeWithNullMessage() {
+ SessionID sessionID = new SessionID("FIX.4.0", "FOO", "BAR");
+ QuickfixjEndpoint endpoint = new QuickfixjEndpoint("", camelContext);
+
+ Exchange exchange = QuickfixjConverters.toExchange(endpoint, sessionID, null, QuickfixjEventCategory.AppMessageSent);
+
+ assertThat((SessionID)exchange.getIn().getHeader(QuickfixjEndpoint.SESSION_ID_KEY), is(sessionID));
+
+ assertThat((QuickfixjEventCategory)exchange.getIn().getHeader(QuickfixjEndpoint.EVENT_CATEGORY_KEY),
+ is(QuickfixjEventCategory.AppMessageSent));
+
+ assertThat(exchange.getIn().getHeader(QuickfixjEndpoint.MESSAGE_TYPE_KEY), is(nullValue()));
+ }
+
+ @Test
+ public void convertMessageWithoutRepeatingGroups() {
+ String data = "8=FIX.4.0\0019=100\00135=D\00134=2\00149=TW\00156=ISLD\00111=ID\00121=1\001"
+ + "40=1\00154=1\00140=2\00138=200\00155=INTC\00110=160\001";
+
+ Exchange exchange = new DefaultExchange(camelContext);
+ Object value = camelContext.getTypeConverter().convertTo(Message.class, exchange, data);
+
+ assertThat(value, instanceOf(Message.class));
+ }
+
+ @Test
+ public void convertMessageWithRepeatingGroupsUsingSessionID() throws Exception {
+ SessionID sessionID = new SessionID("FIX.4.4", "FOO", "BAR");
+
+ createSession(sessionID);
+
+ try {
+ String data = "8=FIX.4.4\0019=40\00135=A\001"
+ + "627=2\001628=FOO\001628=BAR\001"
+ + "98=0\001384=2\001372=D\001385=R\001372=8\001385=S\00110=230\001";
+
+ Exchange exchange = new DefaultExchange(camelContext);
+ exchange.getIn().setHeader(QuickfixjEndpoint.SESSION_ID_KEY, sessionID);
+ exchange.getIn().setBody(data);
+
+ Message message = exchange.getIn().getBody(Message.class);
+
+ NoHops hop = new NoHops();
+ message.getHeader().getGroup(1, hop);
+ assertEquals("FOO", hop.getString(HopCompID.FIELD));
+ message.getHeader().getGroup(2, hop);
+ assertEquals("BAR", hop.getString(HopCompID.FIELD));
+
+ } finally {
+ quickfixjEngine.stop();
+ }
+ }
+
+ @Test
+ public void convertMessageWithRepeatingGroupsUsingExchangeDictionary() throws Exception {
+ SessionID sessionID = new SessionID("FIX.4.4", "FOO", "BAR");
+
+ createSession(sessionID);
+
+ try {
+ String data = "8=FIX.4.4\0019=40\00135=A\001"
+ + "627=2\001628=FOO\001628=BAR\001"
+ + "98=0\001384=2\001372=D\001385=R\001372=8\001385=S\00110=230\001";
+
+ Exchange exchange = new DefaultExchange(camelContext);
+ exchange.setProperty(QuickfixjEndpoint.DATA_DICTIONARY_KEY, new DataDictionary("FIX44.xml"));
+ exchange.getIn().setBody(data);
+
+ Message message = exchange.getIn().getBody(Message.class);
+
+ NoHops hop = new NoHops();
+ message.getHeader().getGroup(1, hop);
+ assertEquals("FOO", hop.getString(HopCompID.FIELD));
+ message.getHeader().getGroup(2, hop);
+ assertEquals("BAR", hop.getString(HopCompID.FIELD));
+
+ } finally {
+ quickfixjEngine.stop();
+ }
+ }
+
+ @Test
+ public void convertMessageWithRepeatingGroupsUsingExchangeDictionaryResource() throws Exception {
+ SessionID sessionID = new SessionID("FIX.4.4", "FOO", "BAR");
+
+ createSession(sessionID);
+
+ try {
+ String data = "8=FIX.4.4\0019=40\00135=A\001"
+ + "627=2\001628=FOO\001628=BAR\001"
+ + "98=0\001384=2\001372=D\001385=R\001372=8\001385=S\00110=230\001";
+
+ Exchange exchange = new DefaultExchange(camelContext);
+ exchange.setProperty(QuickfixjEndpoint.DATA_DICTIONARY_KEY, "FIX44.xml");
+ exchange.getIn().setBody(data);
+
+ Message message = exchange.getIn().getBody(Message.class);
+
+ NoHops hop = new NoHops();
+ message.getHeader().getGroup(1, hop);
+ assertEquals("FOO", hop.getString(HopCompID.FIELD));
+ message.getHeader().getGroup(2, hop);
+ assertEquals("BAR", hop.getString(HopCompID.FIELD));
+
+ } finally {
+ quickfixjEngine.stop();
+ }
+ }
+
+ private void createSession(SessionID sessionID) throws IOException, ConfigError, FieldConvertError, JMException, Exception {
+ SessionSettings settings = new SessionSettings();
+ settings.setString(Acceptor.SETTING_SOCKET_ACCEPT_PROTOCOL, TransportType.VM_PIPE.toString());
+
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.ACCEPTOR_CONNECTION_TYPE);
+ settings.setLong(sessionID, Acceptor.SETTING_SOCKET_ACCEPT_PORT, 1234);
+ TestSupport.setSessionID(settings, sessionID);
+
+ TestSupport.writeSettings(settings, settingsFile);
+
+ quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+ quickfixjEngine.start();
+ }
+}
Added: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjEngineTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjEngineTest.java?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjEngineTest.java (added)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjEngineTest.java Tue Sep 28 21:26:58 2010
@@ -0,0 +1,539 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.JMException;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.mina.common.TransportType;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import quickfix.Acceptor;
+import quickfix.ConfigError;
+import quickfix.DefaultMessageFactory;
+import quickfix.FieldConvertError;
+import quickfix.FieldNotFound;
+import quickfix.FileLogFactory;
+import quickfix.FileStoreFactory;
+import quickfix.FixVersions;
+import quickfix.Initiator;
+import quickfix.JdbcLogFactory;
+import quickfix.JdbcSetting;
+import quickfix.JdbcStoreFactory;
+import quickfix.MemoryStoreFactory;
+import quickfix.Message;
+import quickfix.ScreenLogFactory;
+import quickfix.Session;
+import quickfix.SessionFactory;
+import quickfix.SessionID;
+import quickfix.SessionNotFound;
+import quickfix.SessionSettings;
+import quickfix.SleepycatStoreFactory;
+import quickfix.SocketAcceptor;
+import quickfix.SocketInitiator;
+import quickfix.ThreadedSocketAcceptor;
+import quickfix.ThreadedSocketInitiator;
+import quickfix.field.MsgType;
+import quickfix.fix42.Email;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+
+public class QuickfixjEngineTest {
+ private File settingsFile;
+ private ClassLoader contextClassLoader;
+ private SessionSettings settings;
+ private SessionID sessionID;
+ private File tempdir;
+ private QuickfixjEngine quickfixjEngine;
+
+ @Before
+ public void setUp() throws Exception {
+ settingsFile = File.createTempFile("quickfixj_test_", ".cfg");
+ tempdir = settingsFile.getParentFile();
+ URL[] urls = new URL[] {tempdir.toURI().toURL()};
+
+ contextClassLoader = Thread.currentThread().getContextClassLoader();
+ ClassLoader testClassLoader = new URLClassLoader(urls, contextClassLoader);
+ Thread.currentThread().setContextClassLoader(testClassLoader);
+
+ sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "FOO", "BAR");
+
+ settings = new SessionSettings();
+ settings.setString(Acceptor.SETTING_SOCKET_ACCEPT_PROTOCOL, TransportType.VM_PIPE.toString());
+ settings.setString(Initiator.SETTING_SOCKET_CONNECT_PROTOCOL, TransportType.VM_PIPE.toString());
+ settings.setBool(Session.SETTING_USE_DATA_DICTIONARY, false);
+ TestSupport.setSessionID(settings, sessionID);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ if (quickfixjEngine != null) {
+ quickfixjEngine.stop();
+ }
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void missingSettingsResource() throws Exception {
+ new QuickfixjEngine("bogus.cfg", false);
+ }
+
+ @Test
+ public void defaultInitiator() throws Exception {
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+
+ writeSettings();
+
+ quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+
+ assertThat(quickfixjEngine.getInitiator(), instanceOf(SocketInitiator.class));
+ assertThat(quickfixjEngine.getAcceptor(), nullValue());
+ assertDefaultConfiguration(quickfixjEngine);
+ }
+
+ @Test
+ public void threadPerSessionInitiator() throws Exception {
+ settings.setString(QuickfixjEngine.SETTING_THREAD_MODEL, QuickfixjEngine.ThreadModel.ThreadPerSession.toString());
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+
+ writeSettings();
+
+ quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+
+ assertThat(quickfixjEngine.getInitiator(), instanceOf(ThreadedSocketInitiator.class));
+ assertThat(quickfixjEngine.getAcceptor(), nullValue());
+ assertDefaultConfiguration(quickfixjEngine);
+ }
+
+ @Test
+ public void defaultAcceptor() throws Exception {
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.ACCEPTOR_CONNECTION_TYPE);
+ settings.setLong(sessionID, Acceptor.SETTING_SOCKET_ACCEPT_PORT, 1234);
+
+ writeSettings();
+
+ quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+
+ assertThat(quickfixjEngine.getInitiator(), nullValue());
+ assertThat(quickfixjEngine.getAcceptor(), instanceOf(SocketAcceptor.class));
+ assertDefaultConfiguration(quickfixjEngine);
+ }
+
+ @Test
+ public void threadPerSessionAcceptor() throws Exception {
+ settings.setString(QuickfixjEngine.SETTING_THREAD_MODEL, QuickfixjEngine.ThreadModel.ThreadPerSession.toString());
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.ACCEPTOR_CONNECTION_TYPE);
+ settings.setLong(sessionID, Acceptor.SETTING_SOCKET_ACCEPT_PORT, 1234);
+
+ writeSettings();
+
+ quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+
+ assertThat(quickfixjEngine.getInitiator(), nullValue());
+ assertThat(quickfixjEngine.getAcceptor(), instanceOf(ThreadedSocketAcceptor.class));
+ assertDefaultConfiguration(quickfixjEngine);
+ }
+
+ @Test
+ public void minimalInitiatorAndAcceptor() throws Exception {
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.ACCEPTOR_CONNECTION_TYPE);
+ settings.setLong(sessionID, Acceptor.SETTING_SOCKET_ACCEPT_PORT, 1234);
+
+ SessionID initiatorSessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "FARGLE", "BARGLE");
+ settings.setString(initiatorSessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+ TestSupport.setSessionID(settings, initiatorSessionID);
+
+ writeSettings();
+
+ quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+
+ assertThat(quickfixjEngine.getInitiator(), notNullValue());
+ assertThat(quickfixjEngine.getAcceptor(), notNullValue());
+ assertDefaultConfiguration(quickfixjEngine);
+ }
+
+ @Test
+ public void inferFileStore() throws Exception {
+ settings.setString(FileStoreFactory.SETTING_FILE_STORE_PATH, tempdir.toString());
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+
+ writeSettings();
+
+ quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+
+ assertThat(quickfixjEngine.getInitiator(), notNullValue());
+ assertThat(quickfixjEngine.getAcceptor(), nullValue());
+ assertThat(quickfixjEngine.getSettingsResourceName(), is(settingsFile.getName()));
+ assertThat(quickfixjEngine.getMessageStoreFactory(), instanceOf(FileStoreFactory.class));
+ assertThat(quickfixjEngine.getLogFactory(), instanceOf(ScreenLogFactory.class));
+ assertThat(quickfixjEngine.getMessageFactory(), instanceOf(DefaultMessageFactory.class));
+ }
+
+ // NOTE This is a little strange. If the JDBC driver is set and no log settings are found,
+ // then we use JDBC for both the message store and the log.
+
+ @Test
+ public void inferJdbcStoreAndLog() throws Exception {
+ settings.setString(JdbcSetting.SETTING_JDBC_DRIVER, "driver");
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+
+ writeSettings();
+
+ quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+
+ assertThat(quickfixjEngine.getInitiator(), notNullValue());
+ assertThat(quickfixjEngine.getAcceptor(), nullValue());
+ assertThat(quickfixjEngine.getSettingsResourceName(), is(settingsFile.getName()));
+ assertThat(quickfixjEngine.getMessageStoreFactory(), instanceOf(JdbcStoreFactory.class));
+ assertThat(quickfixjEngine.getLogFactory(), instanceOf(JdbcLogFactory.class));
+ assertThat(quickfixjEngine.getMessageFactory(), instanceOf(DefaultMessageFactory.class));
+ }
+
+ @Test
+ public void ambiguousMessageStore() throws Exception {
+ settings.setString(FileStoreFactory.SETTING_FILE_STORE_PATH, tempdir.toString());
+ settings.setString(JdbcSetting.SETTING_JDBC_DRIVER, "driver");
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+
+ writeSettings();
+
+ doAmbiguityTest("Ambiguous message store");
+ }
+
+ @Test
+ public void inferJdbcStoreWithInferredLog() throws Exception {
+ settings.setString(JdbcSetting.SETTING_JDBC_DRIVER, "driver");
+ settings.setBool(ScreenLogFactory.SETTING_LOG_EVENTS, true);
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+
+ writeSettings();
+
+ quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+
+ assertThat(quickfixjEngine.getInitiator(), notNullValue());
+ assertThat(quickfixjEngine.getAcceptor(), nullValue());
+ assertThat(quickfixjEngine.getSettingsResourceName(), is(settingsFile.getName()));
+ assertThat(quickfixjEngine.getMessageStoreFactory(), instanceOf(JdbcStoreFactory.class));
+ assertThat(quickfixjEngine.getLogFactory(), instanceOf(ScreenLogFactory.class));
+ assertThat(quickfixjEngine.getMessageFactory(), instanceOf(DefaultMessageFactory.class));
+ }
+
+ @Test
+ public void inferSleepycatStore() throws Exception {
+ settings.setString(SleepycatStoreFactory.SETTING_SLEEPYCAT_DATABASE_DIR, tempdir.toString());
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+
+ writeSettings();
+
+ quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+
+ assertThat(quickfixjEngine.getInitiator(), notNullValue());
+ assertThat(quickfixjEngine.getAcceptor(), nullValue());
+ assertThat(quickfixjEngine.getSettingsResourceName(), is(settingsFile.getName()));
+ assertThat(quickfixjEngine.getMessageStoreFactory(), instanceOf(SleepycatStoreFactory.class));
+ assertThat(quickfixjEngine.getLogFactory(), instanceOf(ScreenLogFactory.class));
+ assertThat(quickfixjEngine.getMessageFactory(), instanceOf(DefaultMessageFactory.class));
+ }
+
+ @Test
+ public void inferFileLog() throws Exception {
+ settings.setString(FileLogFactory.SETTING_FILE_LOG_PATH, tempdir.toString());
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+
+ writeSettings();
+
+ quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+
+ assertThat(quickfixjEngine.getInitiator(), notNullValue());
+ assertThat(quickfixjEngine.getAcceptor(), nullValue());
+ assertThat(quickfixjEngine.getSettingsResourceName(), is(settingsFile.getName()));
+ assertThat(quickfixjEngine.getMessageStoreFactory(), instanceOf(MemoryStoreFactory.class));
+ assertThat(quickfixjEngine.getLogFactory(), instanceOf(FileLogFactory.class));
+ assertThat(quickfixjEngine.getMessageFactory(), instanceOf(DefaultMessageFactory.class));
+ }
+
+ @Test
+ public void ambiguousLog() throws Exception {
+ settings.setString(FileLogFactory.SETTING_FILE_LOG_PATH, tempdir.toString());
+ settings.setBool(ScreenLogFactory.SETTING_LOG_EVENTS, true);
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+
+ writeSettings();
+
+ doAmbiguityTest("Ambiguous log");
+ }
+
+ private void doAmbiguityTest(String exceptionText) throws FieldConvertError, IOException, JMException {
+ try {
+ quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+ fail("Expected exception, but none raised");
+ } catch (ConfigError e) {
+ assertTrue(e.getMessage().contains(exceptionText));
+ }
+ }
+
+ @Test
+ public void enableJmxForInitiator() throws Exception {
+ settings.setBool(QuickfixjEngine.SETTING_USE_JMX, true);
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+
+ writeSettings();
+
+ quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+
+ MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
+ Set<ObjectName> n = mbeanServer.queryNames(new ObjectName("org.quickfixj:type=Connector,role=Initiator,*"), null);
+ assertFalse("QFJ mbean not registered", n.isEmpty());
+ }
+
+ @Test
+ public void enableJmxForAcceptor() throws Exception {
+ settings.setBool(QuickfixjEngine.SETTING_USE_JMX, true);
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.ACCEPTOR_CONNECTION_TYPE);
+ settings.setLong(sessionID, Acceptor.SETTING_SOCKET_ACCEPT_PORT, 1234);
+
+ writeSettings();
+
+ quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+
+ MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
+ Set<ObjectName> n = mbeanServer.queryNames(new ObjectName("org.quickfixj:type=Connector,role=Acceptor,*"), null);
+ assertFalse("QFJ mbean not registered", n.isEmpty());
+ }
+
+ @Test
+ public void sessionEvents() throws Exception {
+ SessionID acceptorSessionID = new SessionID(FixVersions.BEGINSTRING_FIX42, "MARKET", "TRADER");
+ SessionID initiatorSessionID = new SessionID(FixVersions.BEGINSTRING_FIX42, "TRADER", "MARKET");
+
+ quickfixjEngine = new QuickfixjEngine("examples/inprocess.cfg", false);
+
+ doLogonEventsTest(acceptorSessionID, initiatorSessionID, quickfixjEngine);
+
+ doApplicationMessageEventsTest(acceptorSessionID, initiatorSessionID, quickfixjEngine);
+
+ doLogoffEventsTest(acceptorSessionID, initiatorSessionID, quickfixjEngine);
+ }
+
+ private void doLogonEventsTest(SessionID acceptorSessionID, SessionID initiatorSessionID, final QuickfixjEngine quickfixjEngine)
+ throws Exception, InterruptedException {
+
+ final List<EventRecord> events = new ArrayList<EventRecord>();
+ final CountDownLatch logonLatch = new CountDownLatch(2);
+
+ QuickfixjEventListener logonListener = new QuickfixjEventListener() {
+ public void onEvent(QuickfixjEventCategory eventCategory,
+ SessionID sessionID, Message message) {
+ events.add(new EventRecord(eventCategory, sessionID, message));
+ if (eventCategory == QuickfixjEventCategory.SessionLogon) {
+ logonLatch.countDown();
+ }
+ }
+ };
+
+ quickfixjEngine.addEventListener(logonListener);
+
+ quickfixjEngine.start();
+
+ assertTrue("Logons not completed", logonLatch.await(5000, TimeUnit.MILLISECONDS));
+ quickfixjEngine.removeEventListener(logonListener);
+
+ assertThat(events.size(), is(7));
+
+ int n = 0;
+
+ assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.SessionCreated));
+ assertThat(events.get(n).getSessionID(), is(initiatorSessionID));
+ assertThat(events.get(n).getMessage(), is(nullValue()));
+ n++;
+
+ assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.AdminMessageSent));
+ assertThat(events.get(n).getSessionID(), is(initiatorSessionID));
+ assertThat(events.get(n).getMessage(), is(notNullValue()));
+ n++;
+
+ assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.AdminMessageReceived));
+ assertThat(events.get(n).getSessionID(), is(acceptorSessionID));
+ assertThat(events.get(n).getMessage(), is(notNullValue()));
+ n++;
+
+ assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.AdminMessageSent));
+ assertThat(events.get(n).getSessionID(), is(acceptorSessionID));
+ assertThat(events.get(n).getMessage(), is(notNullValue()));
+ n++;
+
+ assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.SessionLogon));
+ assertThat(events.get(n).getSessionID(), is(acceptorSessionID));
+ assertThat(events.get(n).getMessage(), is(nullValue()));
+ n++;
+
+ assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.AdminMessageReceived));
+ assertThat(events.get(n).getSessionID(), is(initiatorSessionID));
+ assertThat(events.get(n).getMessage(), is(notNullValue()));
+ n++;
+
+ assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.SessionLogon));
+ assertThat(events.get(n).getSessionID(), is(initiatorSessionID));
+ assertThat(events.get(n).getMessage(), is(nullValue()));
+ n++;
+ }
+
+ private void doApplicationMessageEventsTest(SessionID acceptorSessionID, SessionID initiatorSessionID, final QuickfixjEngine quickfixjEngine)
+ throws SessionNotFound, InterruptedException, FieldNotFound {
+
+ final List<EventRecord> events = new ArrayList<EventRecord>();
+ final CountDownLatch messageLatch = new CountDownLatch(1);
+
+ QuickfixjEventListener messageListener = new QuickfixjEventListener() {
+ public void onEvent(QuickfixjEventCategory eventCategory,
+ SessionID sessionID, Message message) {
+ EventRecord event = new EventRecord(eventCategory, sessionID, message);
+ events.add(event);
+ if (eventCategory == QuickfixjEventCategory.AppMessageReceived) {
+ messageLatch.countDown();
+ }
+ }
+ };
+
+ quickfixjEngine.addEventListener(messageListener);
+ Email email = TestSupport.createEmailMessage("Test");
+ Session.sendToTarget(email, initiatorSessionID);
+
+ assertTrue("Application message not received", messageLatch.await(5000, TimeUnit.MILLISECONDS));
+ quickfixjEngine.removeEventListener(messageListener);
+
+ assertThat(events.size(), is(2));
+
+ int n = 0;
+
+ assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.AppMessageSent));
+ assertThat(events.get(n).getSessionID(), is(initiatorSessionID));
+ assertThat(events.get(n).getMessage().getHeader().getString(MsgType.FIELD), is(MsgType.EMAIL));
+ n++;
+
+ assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.AppMessageReceived));
+ assertThat(events.get(n).getSessionID(), is(acceptorSessionID));
+ assertThat(events.get(n).getMessage().getHeader().getString(MsgType.FIELD), is(MsgType.EMAIL));
+ n++;
+ }
+
+ private void doLogoffEventsTest(SessionID acceptorSessionID, SessionID initiatorSessionID, final QuickfixjEngine quickfixjEngine)
+ throws Exception, InterruptedException {
+
+ final List<EventRecord> events = new ArrayList<EventRecord>();
+ final CountDownLatch logoffLatch = new CountDownLatch(2);
+
+ QuickfixjEventListener logoffListener = new QuickfixjEventListener() {
+ public void onEvent(QuickfixjEventCategory eventCategory,
+ SessionID sessionID, Message message) {
+ EventRecord event = new EventRecord(eventCategory, sessionID, message);
+ events.add(event);
+ if (eventCategory == QuickfixjEventCategory.SessionLogoff) {
+ logoffLatch.countDown();
+ }
+ }
+ };
+
+ quickfixjEngine.addEventListener(logoffListener);
+
+ quickfixjEngine.stop();
+
+ assertTrue("Logoffs not received", logoffLatch.await(5000, TimeUnit.MILLISECONDS));
+ quickfixjEngine.removeEventListener(logoffListener);
+
+ int n = 0;
+
+ assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.SessionLogoff));
+ assertThat(events.get(n).getSessionID(), is(acceptorSessionID));
+ assertThat(events.get(n).getMessage(), is(nullValue()));
+ n++;
+
+ assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.SessionLogoff));
+ assertThat(events.get(n).getSessionID(), is(initiatorSessionID));
+ assertThat(events.get(n).getMessage(), is(nullValue()));
+ n++;
+ }
+
+ private class EventRecord {
+ private final QuickfixjEventCategory eventCategory;
+ private final SessionID sessionID;
+ private final Message message;
+
+ public EventRecord(QuickfixjEventCategory eventCategory,
+ SessionID sessionID, Message message) {
+ super();
+ this.eventCategory = eventCategory;
+ this.sessionID = sessionID;
+ this.message = message;
+ }
+
+ public QuickfixjEventCategory getEventCategory() {
+ return eventCategory;
+ }
+
+ public SessionID getSessionID() {
+ return sessionID;
+ }
+
+ public Message getMessage() {
+ return message;
+ }
+
+ @Override
+ public String toString() {
+ return "EventRecord [eventCategory=" + eventCategory
+ + ", sessionID=" + sessionID + ", message=" + message + "]";
+ }
+ }
+
+ private void assertDefaultConfiguration(QuickfixjEngine quickfixjEngine) throws Exception {
+ assertThat(quickfixjEngine.getSettingsResourceName(), is(settingsFile.getName()));
+ assertThat(quickfixjEngine.getMessageStoreFactory(), instanceOf(MemoryStoreFactory.class));
+ assertThat(quickfixjEngine.getLogFactory(), instanceOf(ScreenLogFactory.class));
+ assertThat(quickfixjEngine.getMessageFactory(), instanceOf(DefaultMessageFactory.class));
+ MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
+ Set<ObjectName> names = mbeanServer.queryNames(new ObjectName("org.quickfixj:*"), null);
+ assertTrue("QFJ mbean should not not registered", names.isEmpty());
+ }
+
+ private void writeSettings() throws IOException {
+ TestSupport.writeSettings(settings, settingsFile);
+ }
+
+}
Added: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java (added)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java Tue Sep 28 21:26:58 2010
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj;
+
+import org.apache.camel.Exchange;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+import quickfix.FixVersions;
+import quickfix.Message;
+import quickfix.SessionID;
+
+public class QuickfixjProducerTest {
+ @Test
+ public void setExceptionOnExchange() throws Exception {
+ Exchange mockExchange = Mockito.mock(Exchange.class);
+
+ QuickfixjEndpoint mockEndpoint = Mockito.mock(QuickfixjEndpoint.class);
+ org.apache.camel.Message mockCamelMessage = Mockito.mock(org.apache.camel.Message.class);
+ Mockito.when(mockExchange.getIn()).thenReturn(mockCamelMessage);
+ Mockito.when(mockCamelMessage.getBody(Message.class)).thenReturn(new Message());
+
+ SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "SENDER", "TARGET");
+ Mockito.when(mockEndpoint.getSessionID()).thenReturn(sessionID);
+
+ QuickfixjProducer producer = new QuickfixjProducer(mockEndpoint);
+
+ producer.process(mockExchange);
+
+ Mockito.verify(mockExchange).setException(Matchers.isA(IllegalStateException.class));
+ }
+}
Added: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java (added)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java Tue Sep 28 21:26:58 2010
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import quickfix.SessionID;
+import quickfix.SessionSettings;
+import quickfix.field.EmailThreadID;
+import quickfix.field.EmailType;
+import quickfix.field.Subject;
+import quickfix.field.Text;
+import quickfix.fix42.Email;
+
+public final class TestSupport {
+ private TestSupport() {
+ // Utility class
+ }
+
+ public static void writeSettings(SessionSettings settings, File settingsFile) throws IOException {
+ FileOutputStream settingsOut = new FileOutputStream(settingsFile);
+ try {
+ settings.toStream(settingsOut);
+ } finally {
+ settingsOut.close();
+ }
+ }
+
+ public static void setSessionID(SessionSettings sessionSettings, SessionID sessionID) {
+ sessionSettings.setString(sessionID, SessionSettings.BEGINSTRING, sessionID.getBeginString());
+ sessionSettings.setString(sessionID, SessionSettings.SENDERCOMPID, sessionID.getSenderCompID());
+ sessionSettings.setString(sessionID, SessionSettings.TARGETCOMPID, sessionID.getTargetCompID());
+ }
+
+ public static Email createEmailMessage(String subject) {
+ Email email = new Email(new EmailThreadID("ID"), new EmailType(EmailType.NEW), new Subject(subject));
+ Email.LinesOfText text = new Email.LinesOfText();
+ text.set(new Text("Content"));
+ email.addGroup(text);
+ return email;
+ }
+}
Added: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/DynamicRoutingExample.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/DynamicRoutingExample.java?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/DynamicRoutingExample.java (added)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/DynamicRoutingExample.java Tue Sep 28 21:26:58 2010
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj.examples;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.PredicateBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.quickfixj.QuickfixjEndpoint;
+import org.apache.camel.component.quickfixj.QuickfixjEventCategory;
+import org.apache.camel.component.quickfixj.TestSupport;
+import org.apache.camel.component.quickfixj.examples.routing.FixMessageRouter;
+import org.apache.camel.component.quickfixj.examples.transform.QuickfixjMessageJsonPrinter;
+import org.apache.camel.component.quickfixj.examples.util.CountDownLatchDecrementer;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import quickfix.field.DeliverToCompID;
+import quickfix.field.MsgType;
+import quickfix.fix42.Email;
+
+public class DynamicRoutingExample {
+ private static final Logger LOG = LoggerFactory.getLogger(DynamicRoutingExample.class);
+
+ public static void main(String[] args) throws Exception {
+ new DynamicRoutingExample().sendMessage();
+ }
+
+ public void sendMessage() throws Exception {
+ DefaultCamelContext context = new DefaultCamelContext();
+
+ final CountDownLatch logonLatch = new CountDownLatch(4);
+ final CountDownLatch receivedMessageLatch = new CountDownLatch(1);
+
+ RouteBuilder routes = new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // Release latch when session logon events are received
+ // We expect four logon events (four sessions)
+ from("quickfixj:examples/gateway.cfg").
+ filter(header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.SessionLogon)).
+ bean(new CountDownLatchDecrementer("logon", logonLatch));
+
+ // Dynamic router -- Uses FIX DeliverTo tags
+ from("quickfixj:examples/gateway.cfg").
+ filter(header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.AppMessageReceived)).
+ recipientList(bean(new FixMessageRouter("quickfixj:examples/gateway.cfg")));
+
+ // Log app messages as JSON
+ from("quickfixj:examples/gateway.cfg").
+ filter(PredicateBuilder.or(
+ header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.AppMessageReceived),
+ header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.AppMessageSent))).
+ bean(new QuickfixjMessageJsonPrinter());
+
+ // If the trader@2 session receives an email then release the latch
+ from("quickfixj:examples/gateway.cfg?sessionID=FIX.4.2:TRADER@2->GATEWAY").
+ filter(PredicateBuilder.and(
+ header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.AppMessageReceived),
+ header(QuickfixjEndpoint.MESSAGE_TYPE_KEY).isEqualTo(MsgType.EMAIL))).
+ bean(new CountDownLatchDecrementer("message", receivedMessageLatch));
+ }
+ };
+
+ context.addRoutes(routes);
+
+ LOG.info("Starting Camel context");
+ context.start();
+
+ // This is not strictly necessary, but it prevents the need for session
+ // synchronization due to app messages being sent before being logged on
+ if (!logonLatch.await(5, TimeUnit.SECONDS)) {
+ throw new IllegalStateException("Logon did not complete");
+ }
+
+ String gatewayUri = "quickfixj:examples/gateway.cfg?sessionID=FIX.4.2:TRADER@1->GATEWAY";
+ Endpoint gatewayEndpoint = context.getEndpoint(gatewayUri);
+ Producer producer = gatewayEndpoint.createProducer();
+
+ Email email = TestSupport.createEmailMessage("Dynamic Routing Example");
+ email.getHeader().setString(DeliverToCompID.FIELD, "TRADER@2");
+
+ LOG.info("Sending routed message");
+
+ Exchange exchange = producer.createExchange(ExchangePattern.InOnly);
+ exchange.getIn().setBody(email);
+ producer.process(exchange);
+
+ if (!receivedMessageLatch.await(5, TimeUnit.SECONDS)) {
+ throw new IllegalStateException("Message did not reach target");
+ }
+
+ LOG.info("Message received, shutting down Camel context");
+
+ context.stop();
+
+ LOG.info("Dynamic routing example complete");
+ }
+}
Added: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/SimpleMessagingExample.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/SimpleMessagingExample.java?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/SimpleMessagingExample.java (added)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/SimpleMessagingExample.java Tue Sep 28 21:26:58 2010
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj.examples;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.PredicateBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.quickfixj.QuickfixjEndpoint;
+import org.apache.camel.component.quickfixj.QuickfixjEventCategory;
+import org.apache.camel.component.quickfixj.TestSupport;
+import org.apache.camel.component.quickfixj.examples.transform.QuickfixjMessageJsonPrinter;
+import org.apache.camel.component.quickfixj.examples.util.CountDownLatchDecrementer;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import quickfix.field.MsgType;
+import quickfix.fix42.Email;
+
+/**
+ * This example demonstrates several features of the QuickFIX/J component. It uses
+ * QFJ session events to synchronize application behavior (e.g., Session logon).
+ *
+ */
+public class SimpleMessagingExample {
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleMessagingExample.class);
+
+ public static void main(String[] args) throws Exception {
+ new SimpleMessagingExample().sendMessage();
+ }
+
+ public void sendMessage() throws Exception {
+ DefaultCamelContext context = new DefaultCamelContext();
+
+ final CountDownLatch logonLatch = new CountDownLatch(2);
+ final CountDownLatch receivedMessageLatch = new CountDownLatch(1);
+
+ RouteBuilder routes = new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // Release latch when session logon events are received
+ // We expect two events, one for the trader session and one for the market session
+ from("quickfixj:examples/inprocess.cfg").
+ filter(header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.SessionLogon)).
+ bean(new CountDownLatchDecrementer("logon", logonLatch));
+
+ // For all received messages, print the JSON-formatted message to stdout
+ from("quickfixj:examples/inprocess.cfg").
+ filter(PredicateBuilder.or(
+ header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.AdminMessageReceived),
+ header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.AppMessageReceived))).
+ bean(new QuickfixjMessageJsonPrinter());
+
+ // If the market session receives an email then release the latch
+ from("quickfixj:examples/inprocess.cfg?sessionID=FIX.4.2:MARKET->TRADER").
+ filter(header(QuickfixjEndpoint.MESSAGE_TYPE_KEY).isEqualTo(MsgType.EMAIL)).
+ bean(new CountDownLatchDecrementer("message", receivedMessageLatch));
+ }
+ };
+
+ context.addRoutes(routes);
+
+ LOG.info("Starting Camel context");
+ context.start();
+
+ if (!logonLatch.await(5L, TimeUnit.SECONDS)) {
+ throw new IllegalStateException("Logon did not succeed");
+ }
+
+ String marketUri = "quickfixj:examples/inprocess.cfg?sessionID=FIX.4.2:TRADER->MARKET";
+ Producer producer = context.getEndpoint(marketUri).createProducer();
+
+ Email email = TestSupport.createEmailMessage("Example");
+ Exchange exchange = producer.createExchange(ExchangePattern.InOnly);
+ exchange.getIn().setBody(email);
+ producer.process(exchange);
+
+ if (!receivedMessageLatch.await(5L, TimeUnit.SECONDS)) {
+ throw new IllegalStateException("Message did not reach market");
+ }
+
+ LOG.info("Message received, shutting down Camel context");
+
+ context.stop();
+
+ LOG.info("Example complete");
+ }
+}
Added: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/routing/FixMessageRouter.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/routing/FixMessageRouter.java?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/routing/FixMessageRouter.java (added)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/routing/FixMessageRouter.java Tue Sep 28 21:26:58 2010
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj.examples.routing;
+
+import org.apache.camel.Exchange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import quickfix.FieldMap;
+import quickfix.Message;
+import quickfix.Message.Header;
+import quickfix.SessionID;
+import quickfix.field.BeginString;
+import quickfix.field.DeliverToCompID;
+import quickfix.field.DeliverToLocationID;
+import quickfix.field.DeliverToSubID;
+import quickfix.field.OnBehalfOfCompID;
+import quickfix.field.OnBehalfOfLocationID;
+import quickfix.field.OnBehalfOfSubID;
+import quickfix.field.SenderCompID;
+import quickfix.field.SenderLocationID;
+import quickfix.field.SenderSubID;
+import quickfix.field.TargetCompID;
+import quickfix.field.TargetLocationID;
+import quickfix.field.TargetSubID;
+
+/**
+ * Routes exchanges based on FIX-specific routing fields in the message.
+ */
+public class FixMessageRouter {
+ private static final Logger LOG = LoggerFactory.getLogger(FixMessageRouter.class);
+
+ private final String engineUri;
+
+ public FixMessageRouter(String engineUri) {
+ this.engineUri = engineUri;
+ }
+
+ public String route(Exchange exchange) {
+ Message message = exchange.getIn().getBody(Message.class);
+ if (message != null) {
+ SessionID destinationSession = getDestinationSessionID(message);
+ if (destinationSession != null) {
+ String destinationUri = String.format("%s?sessionID=%s", engineUri, destinationSession);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Routing destination: " + destinationUri);
+ }
+ return destinationUri;
+ }
+ }
+ return null;
+ }
+
+ private SessionID getDestinationSessionID(Message message) {
+ Header header = message.getHeader();
+ String fixVersion = getField(header, BeginString.FIELD);
+ String destinationCompId = getField(header, DeliverToCompID.FIELD);
+ if (destinationCompId != null) {
+ String destinationSubId = getField(header, DeliverToSubID.FIELD);
+ String destinationLocationId = getField(header, DeliverToLocationID.FIELD);
+
+ header.removeField(DeliverToCompID.FIELD);
+ header.removeField(DeliverToSubID.FIELD);
+ header.removeField(DeliverToLocationID.FIELD);
+
+ String gatewayCompId = getField(header, TargetCompID.FIELD);
+ String gatewaySubId = getField(header, TargetSubID.FIELD);
+ String gatewayLocationId = getField(header, TargetLocationID.FIELD);
+
+ header.setString(OnBehalfOfCompID.FIELD, getField(header, SenderCompID.FIELD));
+ if (header.isSetField(SenderSubID.FIELD)) {
+ header.setString(OnBehalfOfSubID.FIELD, getField(header, SenderSubID.FIELD));
+ }
+ if (header.isSetField(SenderLocationID.FIELD)) {
+ header.setString(OnBehalfOfLocationID.FIELD, getField(header, SenderLocationID.FIELD));
+ }
+
+ return new SessionID(fixVersion, gatewayCompId, gatewaySubId, gatewayLocationId,
+ destinationCompId, destinationSubId, destinationLocationId, null);
+ }
+ return null;
+ }
+
+ private String getField(FieldMap fieldMap, int tag) {
+ if (fieldMap.isSetField(tag)) {
+ try {
+ return fieldMap.getString(tag);
+ } catch (Exception e) {
+ // won't happen
+ }
+ }
+ return null;
+ }
+}
\ No newline at end of file
Added: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/MarketQuoteProvider.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/MarketQuoteProvider.java?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/MarketQuoteProvider.java (added)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/MarketQuoteProvider.java Tue Sep 28 21:26:58 2010
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj.examples.trading;
+
+/**
+ * Trivial market data provider interface to allow plugins for
+ * alternative market data sources.
+ *
+ */
+public interface MarketQuoteProvider {
+ double getBid(String symbol);
+ double getAsk(String symbol);
+}
Added: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/QuickfixjMessageListener.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/QuickfixjMessageListener.java?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/QuickfixjMessageListener.java (added)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/QuickfixjMessageListener.java Tue Sep 28 21:26:58 2010
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj.examples.trading;
+
+import quickfix.Message;
+import quickfix.SessionID;
+
+public interface QuickfixjMessageListener {
+ void onMessage(SessionID sessionID, Message message) throws Exception;
+}
Added: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutor.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutor.java?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutor.java (added)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutor.java Tue Sep 28 21:26:58 2010
@@ -0,0 +1,431 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj.examples.trading;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import quickfix.ConfigError;
+import quickfix.DataDictionary;
+import quickfix.DataDictionaryProvider;
+import quickfix.FieldConvertError;
+import quickfix.FieldNotFound;
+import quickfix.FixVersions;
+import quickfix.IncorrectTagValue;
+import quickfix.LogUtil;
+import quickfix.Message;
+import quickfix.MessageUtils;
+import quickfix.Session;
+import quickfix.SessionID;
+import quickfix.SessionNotFound;
+import quickfix.UnsupportedMessageType;
+import quickfix.field.ApplVerID;
+import quickfix.field.AvgPx;
+import quickfix.field.CumQty;
+import quickfix.field.ExecID;
+import quickfix.field.ExecTransType;
+import quickfix.field.ExecType;
+import quickfix.field.LastPx;
+import quickfix.field.LastQty;
+import quickfix.field.LastShares;
+import quickfix.field.LeavesQty;
+import quickfix.field.OrdStatus;
+import quickfix.field.OrdType;
+import quickfix.field.OrderID;
+import quickfix.field.OrderQty;
+import quickfix.field.Price;
+import quickfix.field.Side;
+import quickfix.field.Symbol;
+
+/**
+ * Trade executor based on QFJ example "executor" (No Camel dependencies)
+ */
+public class TradeExecutor {
+ private static final Logger LOG = LoggerFactory.getLogger(TradeExecutor.class);
+
+ private boolean alwaysFillLimitOrders;
+ private Set<String> validOrderTypes = new HashSet<String>();
+ private MarketQuoteProvider marketQuoteProvider;
+
+ private List<QuickfixjMessageListener> listeners = new CopyOnWriteArrayList<QuickfixjMessageListener>();
+
+ private int orderID;
+ private int execID;
+
+ @SuppressWarnings("serial")
+ public TradeExecutor() throws ConfigError, FieldConvertError {
+ setAlwaysFillLimitOrders(true);
+ setValidOrderTypes(new HashSet<String>() { { add(OrdType.LIMIT + ""); add(OrdType.MARKET + ""); } });
+ setMarketQuoteProvider(new DefaultMarketQuoteProvider(10.00));
+ }
+
+ public void setAlwaysFillLimitOrders(boolean alwaysFillLimitOrders) {
+ this.alwaysFillLimitOrders = alwaysFillLimitOrders;
+ }
+
+ public void setMarketQuoteProvider(MarketQuoteProvider marketQuoteProvider) {
+ this.marketQuoteProvider = marketQuoteProvider;
+ }
+
+ public void setValidOrderTypes(String validOrderTypes) {
+ setValidOrderTypes(new HashSet<String>(Arrays.asList(validOrderTypes.split("\\s*,\\s*"))));
+ }
+
+ public void setValidOrderTypes(Set<String> validOrderTypes) {
+ this.validOrderTypes = validOrderTypes;
+ }
+
+ public void addListener(QuickfixjMessageListener listener) {
+ listeners.add(listener);
+ }
+
+ public void removeListener(QuickfixjMessageListener listener) {
+ listeners.remove(listener);
+ }
+
+ public void execute(final Message message) throws FieldNotFound, UnsupportedMessageType, IncorrectTagValue {
+ final SessionID sessionID = MessageUtils.getSessionID(message);
+
+ try {
+ if (message instanceof quickfix.fix40.NewOrderSingle) {
+ onMessage((quickfix.fix40.NewOrderSingle) message, sessionID);
+ } else if (message instanceof quickfix.fix41.NewOrderSingle) {
+ onMessage((quickfix.fix41.NewOrderSingle) message, sessionID);
+ } else if (message instanceof quickfix.fix42.NewOrderSingle) {
+ onMessage((quickfix.fix42.NewOrderSingle) message, sessionID);
+ } else if (message instanceof quickfix.fix43.NewOrderSingle) {
+ onMessage((quickfix.fix43.NewOrderSingle) message, sessionID);
+ } else if (message instanceof quickfix.fix44.NewOrderSingle) {
+ onMessage((quickfix.fix44.NewOrderSingle) message, sessionID);
+ } else if (message instanceof quickfix.fix50.NewOrderSingle) {
+ onMessage((quickfix.fix50.NewOrderSingle) message, sessionID);
+ }
+ } catch (Exception e) {
+ LOG.error("Error submitting execution task", e);
+ }
+ }
+
+ private void onMessage(quickfix.fix40.NewOrderSingle order, SessionID sessionID) throws FieldNotFound, UnsupportedMessageType, IncorrectTagValue {
+ try {
+ validateOrder(order);
+
+ OrderQty orderQty = order.getOrderQty();
+
+ Price price = getPrice(order);
+
+ quickfix.fix40.ExecutionReport accept = new quickfix.fix40.ExecutionReport(genOrderID(), genExecID(),
+ new ExecTransType(ExecTransType.NEW), new OrdStatus(OrdStatus.NEW), order.getSymbol(), order.getSide(),
+ orderQty, new LastShares(0), new LastPx(0), new CumQty(0), new AvgPx(0));
+
+ accept.set(order.getClOrdID());
+ sendMessage(sessionID, accept);
+
+ if (isOrderExecutable(order, price)) {
+ quickfix.fix40.ExecutionReport fill = new quickfix.fix40.ExecutionReport(genOrderID(), genExecID(),
+ new ExecTransType(ExecTransType.NEW), new OrdStatus(OrdStatus.FILLED), order.getSymbol(), order
+ .getSide(), orderQty, new LastShares(orderQty.getValue()), new LastPx(price.getValue()),
+ new CumQty(orderQty.getValue()), new AvgPx(price.getValue()));
+
+ fill.set(order.getClOrdID());
+
+ sendMessage(sessionID, fill);
+ }
+ } catch (RuntimeException e) {
+ LogUtil.logThrowable(sessionID, e.getMessage(), e);
+ }
+ }
+
+ private boolean isOrderExecutable(Message order, Price price) throws FieldNotFound {
+ if (order.getChar(OrdType.FIELD) == OrdType.LIMIT) {
+ BigDecimal limitPrice = new BigDecimal(order.getString(Price.FIELD));
+ char side = order.getChar(Side.FIELD);
+ BigDecimal thePrice = new BigDecimal(Double.toString(price.getValue()));
+
+ return (side == Side.BUY && thePrice.compareTo(limitPrice) <= 0)
+ || ((side == Side.SELL || side == Side.SELL_SHORT) && thePrice.compareTo(limitPrice) >= 0);
+ }
+ return true;
+ }
+
+ private Price getPrice(Message message) throws FieldNotFound {
+ Price price;
+ if (message.getChar(OrdType.FIELD) == OrdType.LIMIT && alwaysFillLimitOrders) {
+ price = new Price(message.getDouble(Price.FIELD));
+ } else {
+ if (marketQuoteProvider == null) {
+ throw new RuntimeException("No market data provider specified for market order");
+ }
+ char side = message.getChar(Side.FIELD);
+ if (side == Side.BUY) {
+ price = new Price(marketQuoteProvider.getAsk(message.getString(Symbol.FIELD)));
+ } else if (side == Side.SELL || side == Side.SELL_SHORT) {
+ price = new Price(marketQuoteProvider.getBid(message.getString(Symbol.FIELD)));
+ } else {
+ throw new RuntimeException("Invalid order side: " + side);
+ }
+ }
+ return price;
+ }
+
+ private void sendMessage(SessionID sessionID, Message message) {
+ try {
+ Session session = Session.lookupSession(sessionID);
+ if (session == null) {
+ throw new SessionNotFound(sessionID.toString());
+ }
+
+ DataDictionaryProvider provider = session.getDataDictionaryProvider();
+ if (provider != null) {
+ try {
+ ApplVerID applVerID = getApplVerID(session, message);
+ DataDictionary appDataDictionary = provider.getApplicationDataDictionary(applVerID, null);
+ appDataDictionary.validate(message, true);
+ } catch (Exception e) {
+ LogUtil.logThrowable(sessionID, "Outgoing message failed validation: "
+ + e.getMessage(), e);
+ return;
+ }
+ }
+
+ for (QuickfixjMessageListener listener : listeners) {
+ try {
+ listener.onMessage(sessionID, message);
+ } catch (Throwable e) {
+ LogUtil.logThrowable(sessionID, "Error while dispatching message", e);
+ }
+ }
+
+ } catch (SessionNotFound e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ private ApplVerID getApplVerID(Session session, Message message) {
+ String beginString = session.getSessionID().getBeginString();
+ if (FixVersions.BEGINSTRING_FIXT11.equals(beginString)) {
+ return new ApplVerID(ApplVerID.FIX50);
+ } else {
+ return MessageUtils.toApplVerID(beginString);
+ }
+ }
+
+ private void onMessage(quickfix.fix41.NewOrderSingle order, SessionID sessionID) throws FieldNotFound, UnsupportedMessageType, IncorrectTagValue {
+ try {
+ validateOrder(order);
+
+ OrderQty orderQty = order.getOrderQty();
+ Price price = getPrice(order);
+
+ quickfix.fix41.ExecutionReport accept = new quickfix.fix41.ExecutionReport(genOrderID(), genExecID(),
+ new ExecTransType(ExecTransType.NEW), new ExecType(ExecType.NEW), new OrdStatus(OrdStatus.NEW), order
+ .getSymbol(), order.getSide(), orderQty, new LastShares(0), new LastPx(0), new LeavesQty(0),
+ new CumQty(0), new AvgPx(0));
+
+ accept.set(order.getClOrdID());
+ sendMessage(sessionID, accept);
+
+ if (isOrderExecutable(order, price)) {
+ quickfix.fix41.ExecutionReport executionReport = new quickfix.fix41.ExecutionReport(genOrderID(),
+ genExecID(), new ExecTransType(ExecTransType.NEW), new ExecType(ExecType.FILL), new OrdStatus(
+ OrdStatus.FILLED), order.getSymbol(), order.getSide(), orderQty, new LastShares(orderQty
+ .getValue()), new LastPx(price.getValue()), new LeavesQty(0), new CumQty(orderQty
+ .getValue()), new AvgPx(price.getValue()));
+
+ executionReport.set(order.getClOrdID());
+
+ sendMessage(sessionID, executionReport);
+ }
+ } catch (RuntimeException e) {
+ LogUtil.logThrowable(sessionID, e.getMessage(), e);
+ }
+ }
+
+ private void onMessage(quickfix.fix42.NewOrderSingle order, SessionID sessionID) throws FieldNotFound, UnsupportedMessageType, IncorrectTagValue {
+ try {
+ validateOrder(order);
+
+ OrderQty orderQty = order.getOrderQty();
+ Price price = getPrice(order);
+
+ quickfix.fix42.ExecutionReport accept = new quickfix.fix42.ExecutionReport(genOrderID(), genExecID(),
+ new ExecTransType(ExecTransType.NEW), new ExecType(ExecType.NEW), new OrdStatus(OrdStatus.NEW), order
+ .getSymbol(), order.getSide(), new LeavesQty(0), new CumQty(0), new AvgPx(0));
+
+ accept.set(order.getClOrdID());
+ sendMessage(sessionID, accept);
+
+ if (isOrderExecutable(order, price)) {
+ quickfix.fix42.ExecutionReport executionReport = new quickfix.fix42.ExecutionReport(genOrderID(),
+ genExecID(), new ExecTransType(ExecTransType.NEW), new ExecType(ExecType.FILL),
+ new OrdStatus(OrdStatus.FILLED), order.getSymbol(), order.getSide(), new LeavesQty(0),
+ new CumQty(orderQty.getValue()), new AvgPx(price.getValue()));
+
+ executionReport.set(order.getClOrdID());
+ executionReport.set(orderQty);
+ executionReport.set(new LastShares(orderQty.getValue()));
+ executionReport.set(new LastPx(price.getValue()));
+
+ sendMessage(sessionID, executionReport);
+ }
+ } catch (RuntimeException e) {
+ LogUtil.logThrowable(sessionID, e.getMessage(), e);
+ }
+ }
+
+ private void validateOrder(Message order) throws IncorrectTagValue, FieldNotFound {
+ OrdType ordType = new OrdType(order.getChar(OrdType.FIELD));
+ if (!validOrderTypes.contains(Character.toString(ordType.getValue()))) {
+ LOG.error("Order type not in ValidOrderTypes setting");
+ throw new IncorrectTagValue(ordType.getField());
+ }
+ if (ordType.getValue() == OrdType.MARKET && marketQuoteProvider == null) {
+ LOG.error("DefaultMarketPrice setting not specified for market order");
+ throw new IncorrectTagValue(ordType.getField());
+ }
+ }
+
+ private void onMessage(quickfix.fix43.NewOrderSingle order, SessionID sessionID) throws FieldNotFound, UnsupportedMessageType, IncorrectTagValue {
+ try {
+ validateOrder(order);
+
+ OrderQty orderQty = order.getOrderQty();
+ Price price = getPrice(order);
+
+ quickfix.fix43.ExecutionReport accept = new quickfix.fix43.ExecutionReport(
+ genOrderID(), genExecID(), new ExecType(ExecType.NEW), new OrdStatus(OrdStatus.NEW),
+ order.getSide(), new LeavesQty(order.getOrderQty().getValue()), new CumQty(0), new AvgPx(0));
+
+ accept.set(order.getClOrdID());
+ accept.set(order.getSymbol());
+ sendMessage(sessionID, accept);
+
+ if (isOrderExecutable(order, price)) {
+ quickfix.fix43.ExecutionReport executionReport = new quickfix.fix43.ExecutionReport(genOrderID(),
+ genExecID(), new ExecType(ExecType.FILL), new OrdStatus(OrdStatus.FILLED), order.getSide(),
+ new LeavesQty(0), new CumQty(orderQty.getValue()), new AvgPx(price.getValue()));
+
+ executionReport.set(order.getClOrdID());
+ executionReport.set(order.getSymbol());
+ executionReport.set(orderQty);
+ executionReport.set(new LastQty(orderQty.getValue()));
+ executionReport.set(new LastPx(price.getValue()));
+
+ sendMessage(sessionID, executionReport);
+ }
+ } catch (RuntimeException e) {
+ LogUtil.logThrowable(sessionID, e.getMessage(), e);
+ }
+ }
+
+ private void onMessage(quickfix.fix44.NewOrderSingle order, SessionID sessionID) throws FieldNotFound, UnsupportedMessageType, IncorrectTagValue {
+ try {
+ validateOrder(order);
+
+ OrderQty orderQty = order.getOrderQty();
+ Price price = getPrice(order);
+
+ quickfix.fix44.ExecutionReport accept = new quickfix.fix44.ExecutionReport(
+ genOrderID(), genExecID(), new ExecType(ExecType.NEW), new OrdStatus(OrdStatus.NEW),
+ order.getSide(), new LeavesQty(order.getOrderQty().getValue()), new CumQty(0), new AvgPx(0));
+
+ accept.set(order.getClOrdID());
+ accept.set(order.getSymbol());
+ sendMessage(sessionID, accept);
+
+ if (isOrderExecutable(order, price)) {
+ quickfix.fix44.ExecutionReport executionReport = new quickfix.fix44.ExecutionReport(genOrderID(),
+ genExecID(), new ExecType(ExecType.FILL), new OrdStatus(OrdStatus.FILLED), order.getSide(),
+ new LeavesQty(0), new CumQty(orderQty.getValue()), new AvgPx(price.getValue()));
+
+ executionReport.set(order.getClOrdID());
+ executionReport.set(order.getSymbol());
+ executionReport.set(orderQty);
+ executionReport.set(new LastQty(orderQty.getValue()));
+ executionReport.set(new LastPx(price.getValue()));
+
+ sendMessage(sessionID, executionReport);
+ }
+ } catch (RuntimeException e) {
+ LogUtil.logThrowable(sessionID, e.getMessage(), e);
+ }
+ }
+
+ private void onMessage(quickfix.fix50.NewOrderSingle order, SessionID sessionID) throws FieldNotFound, UnsupportedMessageType, IncorrectTagValue {
+ try {
+ validateOrder(order);
+
+ OrderQty orderQty = order.getOrderQty();
+ Price price = getPrice(order);
+
+ quickfix.fix50.ExecutionReport accept = new quickfix.fix50.ExecutionReport(
+ genOrderID(), genExecID(), new ExecType(ExecType.NEW), new OrdStatus(OrdStatus.NEW),
+ order.getSide(), new LeavesQty(order.getOrderQty().getValue()), new CumQty(0));
+
+ accept.set(order.getClOrdID());
+ accept.set(order.getSymbol());
+ sendMessage(sessionID, accept);
+
+ if (isOrderExecutable(order, price)) {
+ quickfix.fix50.ExecutionReport executionReport = new quickfix.fix50.ExecutionReport(
+ genOrderID(), genExecID(), new ExecType(ExecType.FILL), new OrdStatus(OrdStatus.FILLED),
+ order.getSide(), new LeavesQty(0), new CumQty(orderQty.getValue()));
+
+ executionReport.set(order.getClOrdID());
+ executionReport.set(order.getSymbol());
+ executionReport.set(orderQty);
+ executionReport.set(new LastQty(orderQty.getValue()));
+ executionReport.set(new LastPx(price.getValue()));
+ executionReport.set(new AvgPx(price.getValue()));
+
+ sendMessage(sessionID, executionReport);
+ }
+ } catch (RuntimeException e) {
+ LogUtil.logThrowable(sessionID, e.getMessage(), e);
+ }
+ }
+
+ public OrderID genOrderID() {
+ return new OrderID(Integer.valueOf(++orderID).toString());
+ }
+
+ public ExecID genExecID() {
+ return new ExecID(Integer.valueOf(++execID).toString());
+ }
+
+ private static class DefaultMarketQuoteProvider implements MarketQuoteProvider {
+ private double defaultMarketPrice;
+
+ public DefaultMarketQuoteProvider(double defaultMarketPrice) {
+ this.defaultMarketPrice = defaultMarketPrice;
+ }
+
+ public double getAsk(String symbol) {
+ return defaultMarketPrice;
+ }
+
+ public double getBid(String symbol) {
+ return defaultMarketPrice;
+ }
+ };
+}
\ No newline at end of file