You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ha...@apache.org on 2010/09/28 23:27:00 UTC

svn commit: r1002361 [2/3] - in /camel/trunk/components/camel-quickfix: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/camel/ src/main/java/org/apache/camel/component/ src/main/java/org/apache/cam...

Added: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConvertersTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConvertersTest.java?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConvertersTest.java (added)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConvertersTest.java Tue Sep 28 21:26:58 2010
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import javax.management.JMException;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.quickfixj.converter.QuickfixjConverters;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.mina.common.TransportType;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import quickfix.Acceptor;
+import quickfix.ConfigError;
+import quickfix.DataDictionary;
+import quickfix.FieldConvertError;
+import quickfix.Initiator;
+import quickfix.Message;
+import quickfix.SessionFactory;
+import quickfix.SessionID;
+import quickfix.SessionSettings;
+import quickfix.field.HopCompID;
+import quickfix.field.MsgType;
+import quickfix.fix44.Message.Header.NoHops;
+
+
+public class QuickfixjConvertersTest {
+    private static DefaultCamelContext camelContext;
+
+    private File settingsFile;
+    private ClassLoader contextClassLoader;
+    private SessionSettings settings;
+    private File tempdir;
+
+    private QuickfixjEngine quickfixjEngine;
+
+    @BeforeClass
+    public static void classSetUp() throws Exception {
+        camelContext = new DefaultCamelContext();
+    }
+    
+    @Before
+    public void setUp() throws Exception {
+        settingsFile = File.createTempFile("quickfixj_test_", ".cfg");
+        tempdir = settingsFile.getParentFile();
+        URL[] urls = new URL[] {tempdir.toURI().toURL()};
+       
+        contextClassLoader = Thread.currentThread().getContextClassLoader();
+        ClassLoader testClassLoader = new URLClassLoader(urls, contextClassLoader);
+        Thread.currentThread().setContextClassLoader(testClassLoader);
+        
+        settings = new SessionSettings();
+        settings.setString(Acceptor.SETTING_SOCKET_ACCEPT_PROTOCOL, TransportType.VM_PIPE.toString());
+        settings.setString(Initiator.SETTING_SOCKET_CONNECT_PROTOCOL, TransportType.VM_PIPE.toString());
+    }
+    
+    @After
+    public void tearDown() throws Exception {
+        Thread.currentThread().setContextClassLoader(contextClassLoader);     
+    }
+
+    @Test
+    public void convertSessionID() {
+        Object value = camelContext.getTypeConverter().convertTo(SessionID.class, "FIX.4.0:FOO->BAR");
+        
+        assertThat(value, instanceOf(SessionID.class));
+        assertThat((SessionID)value, is(new SessionID("FIX.4.0", "FOO", "BAR")));
+    }
+
+    @Test
+    public void convertToExchange() {
+        SessionID sessionID = new SessionID("FIX.4.0", "FOO", "BAR");
+        QuickfixjEndpoint endpoint = new QuickfixjEndpoint("", camelContext);
+        
+        Message message = new Message();     
+        message.getHeader().setString(MsgType.FIELD, MsgType.ORDER_SINGLE);
+        
+        Exchange exchange = QuickfixjConverters.toExchange(endpoint, sessionID, message, QuickfixjEventCategory.AppMessageSent);
+        
+        assertThat((SessionID)exchange.getIn().getHeader(QuickfixjEndpoint.SESSION_ID_KEY), is(sessionID));
+        
+        assertThat((QuickfixjEventCategory)exchange.getIn().getHeader(QuickfixjEndpoint.EVENT_CATEGORY_KEY), 
+                is(QuickfixjEventCategory.AppMessageSent));
+        
+        assertThat((String)exchange.getIn().getHeader(QuickfixjEndpoint.MESSAGE_TYPE_KEY), is(MsgType.ORDER_SINGLE));
+    }
+
+    @Test
+    public void convertToExchangeWithNullMessage() {
+        SessionID sessionID = new SessionID("FIX.4.0", "FOO", "BAR");
+        QuickfixjEndpoint endpoint = new QuickfixjEndpoint("", camelContext);
+        
+        Exchange exchange = QuickfixjConverters.toExchange(endpoint, sessionID, null, QuickfixjEventCategory.AppMessageSent);
+        
+        assertThat((SessionID)exchange.getIn().getHeader(QuickfixjEndpoint.SESSION_ID_KEY), is(sessionID));
+        
+        assertThat((QuickfixjEventCategory)exchange.getIn().getHeader(QuickfixjEndpoint.EVENT_CATEGORY_KEY), 
+                is(QuickfixjEventCategory.AppMessageSent));
+        
+        assertThat(exchange.getIn().getHeader(QuickfixjEndpoint.MESSAGE_TYPE_KEY), is(nullValue()));
+    }
+
+    @Test
+    public void convertMessageWithoutRepeatingGroups() {
+        String data = "8=FIX.4.0\0019=100\00135=D\00134=2\00149=TW\00156=ISLD\00111=ID\00121=1\001"
+            + "40=1\00154=1\00140=2\00138=200\00155=INTC\00110=160\001";
+        
+        Exchange exchange = new DefaultExchange(camelContext);
+        Object value = camelContext.getTypeConverter().convertTo(Message.class, exchange, data);
+        
+        assertThat(value, instanceOf(Message.class));
+    }
+
+    @Test
+    public void convertMessageWithRepeatingGroupsUsingSessionID() throws Exception {
+        SessionID sessionID = new SessionID("FIX.4.4", "FOO", "BAR");
+        
+        createSession(sessionID);
+        
+        try {
+            String data = "8=FIX.4.4\0019=40\00135=A\001"
+                    + "627=2\001628=FOO\001628=BAR\001"
+                    + "98=0\001384=2\001372=D\001385=R\001372=8\001385=S\00110=230\001";
+
+            Exchange exchange = new DefaultExchange(camelContext);
+            exchange.getIn().setHeader(QuickfixjEndpoint.SESSION_ID_KEY, sessionID);
+            exchange.getIn().setBody(data);
+            
+            Message message = exchange.getIn().getBody(Message.class);
+            
+            NoHops hop = new NoHops();
+            message.getHeader().getGroup(1, hop);
+            assertEquals("FOO", hop.getString(HopCompID.FIELD));
+            message.getHeader().getGroup(2, hop);
+            assertEquals("BAR", hop.getString(HopCompID.FIELD));
+
+        } finally {
+            quickfixjEngine.stop();
+        }
+    }
+
+    @Test
+    public void convertMessageWithRepeatingGroupsUsingExchangeDictionary() throws Exception {
+        SessionID sessionID = new SessionID("FIX.4.4", "FOO", "BAR");
+        
+        createSession(sessionID);
+        
+        try {
+            String data = "8=FIX.4.4\0019=40\00135=A\001"
+                    + "627=2\001628=FOO\001628=BAR\001"
+                    + "98=0\001384=2\001372=D\001385=R\001372=8\001385=S\00110=230\001";
+
+            Exchange exchange = new DefaultExchange(camelContext);
+            exchange.setProperty(QuickfixjEndpoint.DATA_DICTIONARY_KEY, new DataDictionary("FIX44.xml"));
+            exchange.getIn().setBody(data);
+            
+            Message message = exchange.getIn().getBody(Message.class);
+            
+            NoHops hop = new NoHops();
+            message.getHeader().getGroup(1, hop);
+            assertEquals("FOO", hop.getString(HopCompID.FIELD));
+            message.getHeader().getGroup(2, hop);
+            assertEquals("BAR", hop.getString(HopCompID.FIELD));
+
+        } finally {
+            quickfixjEngine.stop();
+        }
+    }
+
+    @Test
+    public void convertMessageWithRepeatingGroupsUsingExchangeDictionaryResource() throws Exception {
+        SessionID sessionID = new SessionID("FIX.4.4", "FOO", "BAR");
+        
+        createSession(sessionID);
+        
+        try {
+            String data = "8=FIX.4.4\0019=40\00135=A\001"
+                    + "627=2\001628=FOO\001628=BAR\001"
+                    + "98=0\001384=2\001372=D\001385=R\001372=8\001385=S\00110=230\001";
+
+            Exchange exchange = new DefaultExchange(camelContext);
+            exchange.setProperty(QuickfixjEndpoint.DATA_DICTIONARY_KEY, "FIX44.xml");
+            exchange.getIn().setBody(data);
+            
+            Message message = exchange.getIn().getBody(Message.class);
+            
+            NoHops hop = new NoHops();
+            message.getHeader().getGroup(1, hop);
+            assertEquals("FOO", hop.getString(HopCompID.FIELD));
+            message.getHeader().getGroup(2, hop);
+            assertEquals("BAR", hop.getString(HopCompID.FIELD));
+
+        } finally {
+            quickfixjEngine.stop();
+        }
+    }
+
+    private void createSession(SessionID sessionID) throws IOException, ConfigError, FieldConvertError, JMException, Exception {
+        SessionSettings settings = new SessionSettings();
+        settings.setString(Acceptor.SETTING_SOCKET_ACCEPT_PROTOCOL, TransportType.VM_PIPE.toString());
+        
+        settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.ACCEPTOR_CONNECTION_TYPE);
+        settings.setLong(sessionID, Acceptor.SETTING_SOCKET_ACCEPT_PORT, 1234);
+        TestSupport.setSessionID(settings, sessionID);
+
+        TestSupport.writeSettings(settings, settingsFile);
+        
+        quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+        quickfixjEngine.start(); 
+    }
+}

Added: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjEngineTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjEngineTest.java?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjEngineTest.java (added)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjEngineTest.java Tue Sep 28 21:26:58 2010
@@ -0,0 +1,539 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.JMException;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.mina.common.TransportType;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import quickfix.Acceptor;
+import quickfix.ConfigError;
+import quickfix.DefaultMessageFactory;
+import quickfix.FieldConvertError;
+import quickfix.FieldNotFound;
+import quickfix.FileLogFactory;
+import quickfix.FileStoreFactory;
+import quickfix.FixVersions;
+import quickfix.Initiator;
+import quickfix.JdbcLogFactory;
+import quickfix.JdbcSetting;
+import quickfix.JdbcStoreFactory;
+import quickfix.MemoryStoreFactory;
+import quickfix.Message;
+import quickfix.ScreenLogFactory;
+import quickfix.Session;
+import quickfix.SessionFactory;
+import quickfix.SessionID;
+import quickfix.SessionNotFound;
+import quickfix.SessionSettings;
+import quickfix.SleepycatStoreFactory;
+import quickfix.SocketAcceptor;
+import quickfix.SocketInitiator;
+import quickfix.ThreadedSocketAcceptor;
+import quickfix.ThreadedSocketInitiator;
+import quickfix.field.MsgType;
+import quickfix.fix42.Email;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+
+public class QuickfixjEngineTest {
+    private File settingsFile;
+    private ClassLoader contextClassLoader;
+    private SessionSettings settings;
+    private SessionID sessionID;
+    private File tempdir;
+    private QuickfixjEngine quickfixjEngine;
+    
+    @Before
+    public void setUp() throws Exception {
+        settingsFile = File.createTempFile("quickfixj_test_", ".cfg");
+        tempdir = settingsFile.getParentFile();
+        URL[] urls = new URL[] {tempdir.toURI().toURL()};
+       
+        contextClassLoader = Thread.currentThread().getContextClassLoader();
+        ClassLoader testClassLoader = new URLClassLoader(urls, contextClassLoader);
+        Thread.currentThread().setContextClassLoader(testClassLoader);
+        
+        sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "FOO", "BAR");
+
+        settings = new SessionSettings();
+        settings.setString(Acceptor.SETTING_SOCKET_ACCEPT_PROTOCOL, TransportType.VM_PIPE.toString());
+        settings.setString(Initiator.SETTING_SOCKET_CONNECT_PROTOCOL, TransportType.VM_PIPE.toString());
+        settings.setBool(Session.SETTING_USE_DATA_DICTIONARY, false);
+        TestSupport.setSessionID(settings, sessionID);
+    }
+    
+    @After
+    public void tearDown() throws Exception {
+        Thread.currentThread().setContextClassLoader(contextClassLoader);    
+        if (quickfixjEngine != null) {
+            quickfixjEngine.stop();
+        }
+    }
+    
+    @Test(expected = IllegalArgumentException.class)
+    public void missingSettingsResource() throws Exception {
+        new QuickfixjEngine("bogus.cfg", false);
+    }
+    
+    @Test
+    public void defaultInitiator() throws Exception {           
+        settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+
+        writeSettings();
+        
+        quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+        
+        assertThat(quickfixjEngine.getInitiator(), instanceOf(SocketInitiator.class));
+        assertThat(quickfixjEngine.getAcceptor(), nullValue());
+        assertDefaultConfiguration(quickfixjEngine);
+    }
+
+    @Test
+    public void threadPerSessionInitiator() throws Exception {       
+        settings.setString(QuickfixjEngine.SETTING_THREAD_MODEL, QuickfixjEngine.ThreadModel.ThreadPerSession.toString());
+        settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+
+        writeSettings();
+        
+        quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+        
+        assertThat(quickfixjEngine.getInitiator(), instanceOf(ThreadedSocketInitiator.class));
+        assertThat(quickfixjEngine.getAcceptor(), nullValue());
+        assertDefaultConfiguration(quickfixjEngine);
+    }
+
+    @Test
+    public void defaultAcceptor() throws Exception {   
+        settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.ACCEPTOR_CONNECTION_TYPE);
+        settings.setLong(sessionID, Acceptor.SETTING_SOCKET_ACCEPT_PORT, 1234);
+
+        writeSettings();
+        
+        quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+
+        assertThat(quickfixjEngine.getInitiator(), nullValue());
+        assertThat(quickfixjEngine.getAcceptor(), instanceOf(SocketAcceptor.class));
+        assertDefaultConfiguration(quickfixjEngine);
+    }
+
+    @Test
+    public void threadPerSessionAcceptor() throws Exception {       
+        settings.setString(QuickfixjEngine.SETTING_THREAD_MODEL, QuickfixjEngine.ThreadModel.ThreadPerSession.toString());
+        settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.ACCEPTOR_CONNECTION_TYPE);
+        settings.setLong(sessionID, Acceptor.SETTING_SOCKET_ACCEPT_PORT, 1234);
+
+        writeSettings();
+        
+        quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+        
+        assertThat(quickfixjEngine.getInitiator(), nullValue());
+        assertThat(quickfixjEngine.getAcceptor(), instanceOf(ThreadedSocketAcceptor.class));
+        assertDefaultConfiguration(quickfixjEngine);
+    }
+
+    @Test
+    public void minimalInitiatorAndAcceptor() throws Exception {   
+        settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.ACCEPTOR_CONNECTION_TYPE);
+        settings.setLong(sessionID, Acceptor.SETTING_SOCKET_ACCEPT_PORT, 1234);
+
+        SessionID initiatorSessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "FARGLE", "BARGLE");
+        settings.setString(initiatorSessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+        TestSupport.setSessionID(settings, initiatorSessionID);
+
+        writeSettings();
+        
+        quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+
+        assertThat(quickfixjEngine.getInitiator(), notNullValue());
+        assertThat(quickfixjEngine.getAcceptor(), notNullValue());
+        assertDefaultConfiguration(quickfixjEngine);
+    }
+
+    @Test
+    public void inferFileStore() throws Exception {           
+        settings.setString(FileStoreFactory.SETTING_FILE_STORE_PATH, tempdir.toString());
+        settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+        
+        writeSettings();
+        
+        quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+        
+        assertThat(quickfixjEngine.getInitiator(), notNullValue());
+        assertThat(quickfixjEngine.getAcceptor(), nullValue());
+        assertThat(quickfixjEngine.getSettingsResourceName(), is(settingsFile.getName()));
+        assertThat(quickfixjEngine.getMessageStoreFactory(), instanceOf(FileStoreFactory.class));
+        assertThat(quickfixjEngine.getLogFactory(), instanceOf(ScreenLogFactory.class));
+        assertThat(quickfixjEngine.getMessageFactory(), instanceOf(DefaultMessageFactory.class));
+    }
+
+    // NOTE This is a little strange. If the JDBC driver is set and no log settings are found,
+    // then we use JDBC for both the message store and the log.
+    
+    @Test
+    public void inferJdbcStoreAndLog() throws Exception {           
+        settings.setString(JdbcSetting.SETTING_JDBC_DRIVER, "driver");
+        settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+        
+        writeSettings();
+        
+        quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+        
+        assertThat(quickfixjEngine.getInitiator(), notNullValue());
+        assertThat(quickfixjEngine.getAcceptor(), nullValue());
+        assertThat(quickfixjEngine.getSettingsResourceName(), is(settingsFile.getName()));
+        assertThat(quickfixjEngine.getMessageStoreFactory(), instanceOf(JdbcStoreFactory.class));
+        assertThat(quickfixjEngine.getLogFactory(), instanceOf(JdbcLogFactory.class));
+        assertThat(quickfixjEngine.getMessageFactory(), instanceOf(DefaultMessageFactory.class));
+    }
+
+    @Test
+    public void ambiguousMessageStore() throws Exception {
+        settings.setString(FileStoreFactory.SETTING_FILE_STORE_PATH, tempdir.toString());
+        settings.setString(JdbcSetting.SETTING_JDBC_DRIVER, "driver");
+        settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+        
+        writeSettings();
+        
+        doAmbiguityTest("Ambiguous message store");
+    }
+    
+    @Test
+    public void inferJdbcStoreWithInferredLog() throws Exception {           
+        settings.setString(JdbcSetting.SETTING_JDBC_DRIVER, "driver");
+        settings.setBool(ScreenLogFactory.SETTING_LOG_EVENTS, true);
+        settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+        
+        writeSettings();
+        
+        quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+        
+        assertThat(quickfixjEngine.getInitiator(), notNullValue());
+        assertThat(quickfixjEngine.getAcceptor(), nullValue());
+        assertThat(quickfixjEngine.getSettingsResourceName(), is(settingsFile.getName()));
+        assertThat(quickfixjEngine.getMessageStoreFactory(), instanceOf(JdbcStoreFactory.class));
+        assertThat(quickfixjEngine.getLogFactory(), instanceOf(ScreenLogFactory.class));
+        assertThat(quickfixjEngine.getMessageFactory(), instanceOf(DefaultMessageFactory.class));
+    }
+    
+    @Test
+    public void inferSleepycatStore() throws Exception {           
+        settings.setString(SleepycatStoreFactory.SETTING_SLEEPYCAT_DATABASE_DIR, tempdir.toString());
+        settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+        
+        writeSettings();
+        
+        quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+        
+        assertThat(quickfixjEngine.getInitiator(), notNullValue());
+        assertThat(quickfixjEngine.getAcceptor(), nullValue());
+        assertThat(quickfixjEngine.getSettingsResourceName(), is(settingsFile.getName()));
+        assertThat(quickfixjEngine.getMessageStoreFactory(), instanceOf(SleepycatStoreFactory.class));
+        assertThat(quickfixjEngine.getLogFactory(), instanceOf(ScreenLogFactory.class));
+        assertThat(quickfixjEngine.getMessageFactory(), instanceOf(DefaultMessageFactory.class));
+    }
+
+    @Test
+    public void inferFileLog() throws Exception {           
+        settings.setString(FileLogFactory.SETTING_FILE_LOG_PATH, tempdir.toString());
+        settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+        
+        writeSettings();
+        
+        quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+        
+        assertThat(quickfixjEngine.getInitiator(), notNullValue());
+        assertThat(quickfixjEngine.getAcceptor(), nullValue());
+        assertThat(quickfixjEngine.getSettingsResourceName(), is(settingsFile.getName()));
+        assertThat(quickfixjEngine.getMessageStoreFactory(), instanceOf(MemoryStoreFactory.class));
+        assertThat(quickfixjEngine.getLogFactory(), instanceOf(FileLogFactory.class));
+        assertThat(quickfixjEngine.getMessageFactory(), instanceOf(DefaultMessageFactory.class));
+    }
+
+    @Test
+    public void ambiguousLog() throws Exception {
+        settings.setString(FileLogFactory.SETTING_FILE_LOG_PATH, tempdir.toString());
+        settings.setBool(ScreenLogFactory.SETTING_LOG_EVENTS, true);
+        settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+        
+        writeSettings();
+        
+        doAmbiguityTest("Ambiguous log");
+    }
+
+    private void doAmbiguityTest(String exceptionText) throws FieldConvertError, IOException, JMException {
+        try {
+            quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);     
+            fail("Expected exception, but none raised");
+        } catch (ConfigError e) {
+            assertTrue(e.getMessage().contains(exceptionText));
+        }
+    }
+
+    @Test
+    public void enableJmxForInitiator() throws Exception {
+        settings.setBool(QuickfixjEngine.SETTING_USE_JMX, true);
+        settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+
+        writeSettings();
+        
+        quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+        
+        MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
+        Set<ObjectName> n = mbeanServer.queryNames(new ObjectName("org.quickfixj:type=Connector,role=Initiator,*"), null);
+        assertFalse("QFJ mbean not registered", n.isEmpty());
+    }
+    
+    @Test
+    public void enableJmxForAcceptor() throws Exception {
+        settings.setBool(QuickfixjEngine.SETTING_USE_JMX, true);
+        settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.ACCEPTOR_CONNECTION_TYPE);
+        settings.setLong(sessionID, Acceptor.SETTING_SOCKET_ACCEPT_PORT, 1234);
+
+        writeSettings();
+        
+        quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+        
+        MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
+        Set<ObjectName> n = mbeanServer.queryNames(new ObjectName("org.quickfixj:type=Connector,role=Acceptor,*"), null);
+        assertFalse("QFJ mbean not registered", n.isEmpty());
+    }
+    
+    @Test
+    public void sessionEvents() throws Exception {          
+        SessionID acceptorSessionID =  new SessionID(FixVersions.BEGINSTRING_FIX42, "MARKET", "TRADER");
+        SessionID initiatorSessionID = new SessionID(FixVersions.BEGINSTRING_FIX42, "TRADER", "MARKET");
+                
+        quickfixjEngine = new QuickfixjEngine("examples/inprocess.cfg", false);
+        
+        doLogonEventsTest(acceptorSessionID, initiatorSessionID, quickfixjEngine);
+
+        doApplicationMessageEventsTest(acceptorSessionID, initiatorSessionID, quickfixjEngine);
+        
+        doLogoffEventsTest(acceptorSessionID, initiatorSessionID, quickfixjEngine);
+    }
+
+    private void doLogonEventsTest(SessionID acceptorSessionID, SessionID initiatorSessionID, final QuickfixjEngine quickfixjEngine)
+        throws Exception, InterruptedException {
+
+        final List<EventRecord> events = new ArrayList<EventRecord>();
+        final CountDownLatch logonLatch = new CountDownLatch(2);
+        
+        QuickfixjEventListener logonListener = new QuickfixjEventListener() {
+            public void onEvent(QuickfixjEventCategory eventCategory,
+                    SessionID sessionID, Message message) {
+                events.add(new EventRecord(eventCategory, sessionID, message));
+                if (eventCategory == QuickfixjEventCategory.SessionLogon) {
+                    logonLatch.countDown();                    
+                }
+            }
+        };
+        
+        quickfixjEngine.addEventListener(logonListener);
+        
+        quickfixjEngine.start();
+        
+        assertTrue("Logons not completed", logonLatch.await(5000, TimeUnit.MILLISECONDS));
+        quickfixjEngine.removeEventListener(logonListener);
+        
+        assertThat(events.size(), is(7));
+        
+        int n = 0;
+        
+        assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.SessionCreated));
+        assertThat(events.get(n).getSessionID(), is(initiatorSessionID));
+        assertThat(events.get(n).getMessage(), is(nullValue()));
+        n++;
+    
+        assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.AdminMessageSent));
+        assertThat(events.get(n).getSessionID(), is(initiatorSessionID));
+        assertThat(events.get(n).getMessage(), is(notNullValue()));
+        n++;
+    
+        assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.AdminMessageReceived));
+        assertThat(events.get(n).getSessionID(), is(acceptorSessionID));
+        assertThat(events.get(n).getMessage(), is(notNullValue()));
+        n++;
+    
+        assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.AdminMessageSent));
+        assertThat(events.get(n).getSessionID(), is(acceptorSessionID));
+        assertThat(events.get(n).getMessage(), is(notNullValue()));
+        n++;
+    
+        assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.SessionLogon));
+        assertThat(events.get(n).getSessionID(), is(acceptorSessionID));
+        assertThat(events.get(n).getMessage(), is(nullValue()));
+        n++;
+    
+        assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.AdminMessageReceived));
+        assertThat(events.get(n).getSessionID(), is(initiatorSessionID));
+        assertThat(events.get(n).getMessage(), is(notNullValue()));
+        n++;
+    
+        assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.SessionLogon));
+        assertThat(events.get(n).getSessionID(), is(initiatorSessionID));
+        assertThat(events.get(n).getMessage(), is(nullValue()));
+        n++;
+    }
+
+    private void doApplicationMessageEventsTest(SessionID acceptorSessionID, SessionID initiatorSessionID, final QuickfixjEngine quickfixjEngine) 
+        throws SessionNotFound, InterruptedException, FieldNotFound {
+
+        final List<EventRecord> events = new ArrayList<EventRecord>();
+        final CountDownLatch messageLatch = new CountDownLatch(1);
+        
+        QuickfixjEventListener messageListener = new QuickfixjEventListener() {
+            public void onEvent(QuickfixjEventCategory eventCategory,
+                    SessionID sessionID, Message message) {
+                EventRecord event = new EventRecord(eventCategory, sessionID, message);
+                events.add(event);
+                if (eventCategory == QuickfixjEventCategory.AppMessageReceived) {
+                    messageLatch.countDown();                    
+                }
+            }
+        };
+        
+        quickfixjEngine.addEventListener(messageListener);
+        Email email = TestSupport.createEmailMessage("Test");
+        Session.sendToTarget(email, initiatorSessionID);
+        
+        assertTrue("Application message not received", messageLatch.await(5000, TimeUnit.MILLISECONDS));
+        quickfixjEngine.removeEventListener(messageListener);
+        
+        assertThat(events.size(), is(2));
+        
+        int n = 0;
+        
+        assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.AppMessageSent));
+        assertThat(events.get(n).getSessionID(), is(initiatorSessionID));
+        assertThat(events.get(n).getMessage().getHeader().getString(MsgType.FIELD), is(MsgType.EMAIL));
+        n++;
+
+        assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.AppMessageReceived));
+        assertThat(events.get(n).getSessionID(), is(acceptorSessionID));
+        assertThat(events.get(n).getMessage().getHeader().getString(MsgType.FIELD), is(MsgType.EMAIL));
+        n++;
+    }
+
+    private void doLogoffEventsTest(SessionID acceptorSessionID, SessionID initiatorSessionID, final QuickfixjEngine quickfixjEngine)
+        throws Exception, InterruptedException {
+
+        final List<EventRecord> events = new ArrayList<EventRecord>();
+        final CountDownLatch logoffLatch = new CountDownLatch(2);
+        
+        QuickfixjEventListener logoffListener = new QuickfixjEventListener() {
+            public void onEvent(QuickfixjEventCategory eventCategory,
+                    SessionID sessionID, Message message) {
+                EventRecord event = new EventRecord(eventCategory, sessionID, message);
+                events.add(event);
+                if (eventCategory == QuickfixjEventCategory.SessionLogoff) {
+                    logoffLatch.countDown();                    
+                }
+            }
+        };
+        
+        quickfixjEngine.addEventListener(logoffListener);
+    
+        quickfixjEngine.stop();   
+    
+        assertTrue("Logoffs not received", logoffLatch.await(5000, TimeUnit.MILLISECONDS));
+        quickfixjEngine.removeEventListener(logoffListener);
+
+        int n = 0;
+        
+        assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.SessionLogoff));
+        assertThat(events.get(n).getSessionID(), is(acceptorSessionID));
+        assertThat(events.get(n).getMessage(), is(nullValue()));
+        n++;
+
+        assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.SessionLogoff));
+        assertThat(events.get(n).getSessionID(), is(initiatorSessionID));
+        assertThat(events.get(n).getMessage(), is(nullValue()));
+        n++;
+    }
+
+    private class EventRecord {
+        private final QuickfixjEventCategory eventCategory;
+        private final SessionID sessionID;
+        private final Message message;
+        
+        public EventRecord(QuickfixjEventCategory eventCategory,
+                SessionID sessionID, Message message) {
+            super();
+            this.eventCategory = eventCategory;
+            this.sessionID = sessionID;
+            this.message = message;
+        }
+        
+        public QuickfixjEventCategory getEventCategory() {
+            return eventCategory;
+        }
+        
+        public SessionID getSessionID() {
+            return sessionID;
+        }
+        
+        public Message getMessage() {
+            return message;
+        }
+
+        @Override
+        public String toString() {
+            return "EventRecord [eventCategory=" + eventCategory
+                    + ", sessionID=" + sessionID + ", message=" + message + "]";
+        }
+    }
+    
+    private void assertDefaultConfiguration(QuickfixjEngine quickfixjEngine) throws Exception {
+        assertThat(quickfixjEngine.getSettingsResourceName(), is(settingsFile.getName()));
+        assertThat(quickfixjEngine.getMessageStoreFactory(), instanceOf(MemoryStoreFactory.class));
+        assertThat(quickfixjEngine.getLogFactory(), instanceOf(ScreenLogFactory.class));
+        assertThat(quickfixjEngine.getMessageFactory(), instanceOf(DefaultMessageFactory.class));
+        MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
+        Set<ObjectName> names = mbeanServer.queryNames(new ObjectName("org.quickfixj:*"), null);
+        assertTrue("QFJ mbean should not not registered", names.isEmpty());
+    }
+
+    private void writeSettings() throws IOException {
+        TestSupport.writeSettings(settings, settingsFile);
+    }
+        
+}

Added: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java (added)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java Tue Sep 28 21:26:58 2010
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj;
+
+import org.apache.camel.Exchange;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+import quickfix.FixVersions;
+import quickfix.Message;
+import quickfix.SessionID;
+
+public class QuickfixjProducerTest {
+    @Test
+    public void setExceptionOnExchange() throws Exception {
+        Exchange mockExchange = Mockito.mock(Exchange.class);
+
+        QuickfixjEndpoint mockEndpoint = Mockito.mock(QuickfixjEndpoint.class);
+        org.apache.camel.Message mockCamelMessage = Mockito.mock(org.apache.camel.Message.class);
+        Mockito.when(mockExchange.getIn()).thenReturn(mockCamelMessage);
+        Mockito.when(mockCamelMessage.getBody(Message.class)).thenReturn(new Message());
+
+        SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "SENDER", "TARGET");       
+        Mockito.when(mockEndpoint.getSessionID()).thenReturn(sessionID);
+        
+        QuickfixjProducer producer = new QuickfixjProducer(mockEndpoint);
+        
+        producer.process(mockExchange);
+        
+        Mockito.verify(mockExchange).setException(Matchers.isA(IllegalStateException.class));
+    }
+}

Added: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java (added)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java Tue Sep 28 21:26:58 2010
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import quickfix.SessionID;
+import quickfix.SessionSettings;
+import quickfix.field.EmailThreadID;
+import quickfix.field.EmailType;
+import quickfix.field.Subject;
+import quickfix.field.Text;
+import quickfix.fix42.Email;
+
+public final class TestSupport {
+    private TestSupport() {
+        // Utility class
+    }
+
+    public static void writeSettings(SessionSettings settings, File settingsFile) throws IOException {
+        FileOutputStream settingsOut = new FileOutputStream(settingsFile);
+        try {
+            settings.toStream(settingsOut);
+        }  finally {
+            settingsOut.close();
+        }
+    }
+
+    public static void setSessionID(SessionSettings sessionSettings, SessionID sessionID) {
+        sessionSettings.setString(sessionID, SessionSettings.BEGINSTRING, sessionID.getBeginString());
+        sessionSettings.setString(sessionID, SessionSettings.SENDERCOMPID, sessionID.getSenderCompID());
+        sessionSettings.setString(sessionID, SessionSettings.TARGETCOMPID, sessionID.getTargetCompID());
+    }
+
+    public static Email createEmailMessage(String subject) {
+        Email email = new Email(new EmailThreadID("ID"), new EmailType(EmailType.NEW), new Subject(subject));
+        Email.LinesOfText text = new Email.LinesOfText();
+        text.set(new Text("Content"));
+        email.addGroup(text);
+        return email;
+    }
+}

Added: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/DynamicRoutingExample.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/DynamicRoutingExample.java?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/DynamicRoutingExample.java (added)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/DynamicRoutingExample.java Tue Sep 28 21:26:58 2010
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj.examples;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.PredicateBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.quickfixj.QuickfixjEndpoint;
+import org.apache.camel.component.quickfixj.QuickfixjEventCategory;
+import org.apache.camel.component.quickfixj.TestSupport;
+import org.apache.camel.component.quickfixj.examples.routing.FixMessageRouter;
+import org.apache.camel.component.quickfixj.examples.transform.QuickfixjMessageJsonPrinter;
+import org.apache.camel.component.quickfixj.examples.util.CountDownLatchDecrementer;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import quickfix.field.DeliverToCompID;
+import quickfix.field.MsgType;
+import quickfix.fix42.Email;
+
+public class DynamicRoutingExample {
+    private static final Logger LOG = LoggerFactory.getLogger(DynamicRoutingExample.class);
+
+    public static void main(String[] args) throws Exception {
+        new DynamicRoutingExample().sendMessage();
+    }
+    
+    public void sendMessage() throws Exception {        
+        DefaultCamelContext context = new DefaultCamelContext();
+        
+        final CountDownLatch logonLatch = new CountDownLatch(4);
+        final CountDownLatch receivedMessageLatch = new CountDownLatch(1);
+        
+        RouteBuilder routes = new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // Release latch when session logon events are received
+                // We expect four logon events (four sessions)
+                from("quickfixj:examples/gateway.cfg").
+                    filter(header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.SessionLogon)).
+                    bean(new CountDownLatchDecrementer("logon", logonLatch));
+
+                // Dynamic router -- Uses FIX DeliverTo tags
+                from("quickfixj:examples/gateway.cfg").
+                    filter(header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.AppMessageReceived)).
+                    recipientList(bean(new FixMessageRouter("quickfixj:examples/gateway.cfg")));
+
+                // Log app messages as JSON
+                from("quickfixj:examples/gateway.cfg").
+                    filter(PredicateBuilder.or(
+                            header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.AppMessageReceived),
+                            header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.AppMessageSent))).
+                    bean(new QuickfixjMessageJsonPrinter());
+                
+                // If the trader@2 session receives an email then release the latch
+                from("quickfixj:examples/gateway.cfg?sessionID=FIX.4.2:TRADER@2->GATEWAY").
+                    filter(PredicateBuilder.and(
+                            header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.AppMessageReceived),
+                            header(QuickfixjEndpoint.MESSAGE_TYPE_KEY).isEqualTo(MsgType.EMAIL))).
+                    bean(new CountDownLatchDecrementer("message", receivedMessageLatch));
+            }
+        };
+        
+        context.addRoutes(routes);
+        
+        LOG.info("Starting Camel context");
+        context.start();
+        
+        // This is not strictly necessary, but it prevents the need for session
+        // synchronization due to app messages being sent before being logged on
+        if (!logonLatch.await(5, TimeUnit.SECONDS)) {
+            throw new IllegalStateException("Logon did not complete");
+        }
+        
+        String gatewayUri = "quickfixj:examples/gateway.cfg?sessionID=FIX.4.2:TRADER@1->GATEWAY";
+        Endpoint gatewayEndpoint = context.getEndpoint(gatewayUri);
+        Producer producer = gatewayEndpoint.createProducer();
+        
+        Email email = TestSupport.createEmailMessage("Dynamic Routing Example");
+        email.getHeader().setString(DeliverToCompID.FIELD, "TRADER@2");
+        
+        LOG.info("Sending routed message");
+        
+        Exchange exchange = producer.createExchange(ExchangePattern.InOnly);
+        exchange.getIn().setBody(email);
+        producer.process(exchange);            
+
+        if (!receivedMessageLatch.await(5, TimeUnit.SECONDS)) {
+            throw new IllegalStateException("Message did not reach target");
+        }
+        
+        LOG.info("Message received, shutting down Camel context");
+        
+        context.stop();
+        
+        LOG.info("Dynamic routing example complete");
+    }
+}

Added: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/SimpleMessagingExample.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/SimpleMessagingExample.java?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/SimpleMessagingExample.java (added)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/SimpleMessagingExample.java Tue Sep 28 21:26:58 2010
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj.examples;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.PredicateBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.quickfixj.QuickfixjEndpoint;
+import org.apache.camel.component.quickfixj.QuickfixjEventCategory;
+import org.apache.camel.component.quickfixj.TestSupport;
+import org.apache.camel.component.quickfixj.examples.transform.QuickfixjMessageJsonPrinter;
+import org.apache.camel.component.quickfixj.examples.util.CountDownLatchDecrementer;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import quickfix.field.MsgType;
+import quickfix.fix42.Email;
+
+/**
+ * This example demonstrates several features of the QuickFIX/J component. It uses
+ * QFJ session events to synchronize application behavior (e.g., Session logon).
+ * 
+ */
+public class SimpleMessagingExample {
+    private static final Logger LOG = LoggerFactory.getLogger(SimpleMessagingExample.class);
+
+    public static void main(String[] args) throws Exception {
+        new SimpleMessagingExample().sendMessage();
+    }
+    
+    public void sendMessage() throws Exception {        
+        DefaultCamelContext context = new DefaultCamelContext();
+        
+        final CountDownLatch logonLatch = new CountDownLatch(2);
+        final CountDownLatch receivedMessageLatch = new CountDownLatch(1);
+        
+        RouteBuilder routes = new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // Release latch when session logon events are received
+                // We expect two events, one for the trader session and one for the market session
+                from("quickfixj:examples/inprocess.cfg").
+                    filter(header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.SessionLogon)).
+                    bean(new CountDownLatchDecrementer("logon", logonLatch));
+
+                // For all received messages, print the JSON-formatted message to stdout
+                from("quickfixj:examples/inprocess.cfg").
+                    filter(PredicateBuilder.or(
+                            header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.AdminMessageReceived),
+                            header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.AppMessageReceived))).
+                    bean(new QuickfixjMessageJsonPrinter());
+
+                // If the market session receives an email then release the latch
+                from("quickfixj:examples/inprocess.cfg?sessionID=FIX.4.2:MARKET->TRADER").
+                    filter(header(QuickfixjEndpoint.MESSAGE_TYPE_KEY).isEqualTo(MsgType.EMAIL)).
+                    bean(new CountDownLatchDecrementer("message", receivedMessageLatch));
+            }
+        };
+        
+        context.addRoutes(routes);
+        
+        LOG.info("Starting Camel context");
+        context.start();
+        
+        if (!logonLatch.await(5L, TimeUnit.SECONDS)) {
+            throw new IllegalStateException("Logon did not succeed");
+        }
+        
+        String marketUri = "quickfixj:examples/inprocess.cfg?sessionID=FIX.4.2:TRADER->MARKET";
+        Producer producer = context.getEndpoint(marketUri).createProducer();
+        
+        Email email = TestSupport.createEmailMessage("Example");
+        Exchange exchange = producer.createExchange(ExchangePattern.InOnly);
+        exchange.getIn().setBody(email);
+        producer.process(exchange);            
+
+        if (!receivedMessageLatch.await(5L, TimeUnit.SECONDS)) {
+            throw new IllegalStateException("Message did not reach market");
+        }
+        
+        LOG.info("Message received, shutting down Camel context");
+        
+        context.stop();
+        
+        LOG.info("Example complete");
+    }
+}

Added: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/routing/FixMessageRouter.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/routing/FixMessageRouter.java?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/routing/FixMessageRouter.java (added)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/routing/FixMessageRouter.java Tue Sep 28 21:26:58 2010
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj.examples.routing;
+
+import org.apache.camel.Exchange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import quickfix.FieldMap;
+import quickfix.Message;
+import quickfix.Message.Header;
+import quickfix.SessionID;
+import quickfix.field.BeginString;
+import quickfix.field.DeliverToCompID;
+import quickfix.field.DeliverToLocationID;
+import quickfix.field.DeliverToSubID;
+import quickfix.field.OnBehalfOfCompID;
+import quickfix.field.OnBehalfOfLocationID;
+import quickfix.field.OnBehalfOfSubID;
+import quickfix.field.SenderCompID;
+import quickfix.field.SenderLocationID;
+import quickfix.field.SenderSubID;
+import quickfix.field.TargetCompID;
+import quickfix.field.TargetLocationID;
+import quickfix.field.TargetSubID;
+
+/**
+ * Routes exchanges based on FIX-specific routing fields in the message.
+ */
+public class FixMessageRouter {
+    private static final Logger LOG = LoggerFactory.getLogger(FixMessageRouter.class);
+
+    private final String engineUri;
+    
+    public FixMessageRouter(String engineUri) {
+        this.engineUri = engineUri;
+    }
+    
+    public String route(Exchange exchange) {
+        Message message = exchange.getIn().getBody(Message.class);
+        if (message != null) {
+            SessionID destinationSession = getDestinationSessionID(message);
+            if (destinationSession != null) {
+                String destinationUri = String.format("%s?sessionID=%s", engineUri, destinationSession);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Routing destination: " + destinationUri);
+                }
+                return destinationUri;
+            }
+        }
+        return null;
+    }
+
+    private SessionID getDestinationSessionID(Message message) {
+        Header header = message.getHeader();
+        String fixVersion = getField(header, BeginString.FIELD);
+        String destinationCompId = getField(header, DeliverToCompID.FIELD);
+        if (destinationCompId != null) {
+            String destinationSubId = getField(header, DeliverToSubID.FIELD);
+            String destinationLocationId = getField(header, DeliverToLocationID.FIELD);
+            
+            header.removeField(DeliverToCompID.FIELD);
+            header.removeField(DeliverToSubID.FIELD);
+            header.removeField(DeliverToLocationID.FIELD);
+            
+            String gatewayCompId = getField(header, TargetCompID.FIELD);
+            String gatewaySubId = getField(header, TargetSubID.FIELD);
+            String gatewayLocationId = getField(header, TargetLocationID.FIELD);
+            
+            header.setString(OnBehalfOfCompID.FIELD, getField(header, SenderCompID.FIELD));
+            if (header.isSetField(SenderSubID.FIELD)) {
+                header.setString(OnBehalfOfSubID.FIELD, getField(header, SenderSubID.FIELD));
+            }
+            if (header.isSetField(SenderLocationID.FIELD)) {
+                header.setString(OnBehalfOfLocationID.FIELD, getField(header, SenderLocationID.FIELD));
+            }
+            
+            return new SessionID(fixVersion, gatewayCompId, gatewaySubId, gatewayLocationId,
+                destinationCompId, destinationSubId, destinationLocationId, null);
+        }
+        return null;
+    }
+
+    private String getField(FieldMap fieldMap, int tag) {
+        if (fieldMap.isSetField(tag)) {
+            try {
+                return fieldMap.getString(tag);
+            } catch (Exception e) {
+                // won't happen
+            }
+        }
+        return null;
+    }
+}
\ No newline at end of file

Added: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/MarketQuoteProvider.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/MarketQuoteProvider.java?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/MarketQuoteProvider.java (added)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/MarketQuoteProvider.java Tue Sep 28 21:26:58 2010
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj.examples.trading;
+
+/**
+ * Trivial market data provider interface to allow plugins for
+ * alternative market data sources.
+ *
+ */
+public interface MarketQuoteProvider {
+    double getBid(String symbol);
+    double getAsk(String symbol);
+}

Added: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/QuickfixjMessageListener.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/QuickfixjMessageListener.java?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/QuickfixjMessageListener.java (added)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/QuickfixjMessageListener.java Tue Sep 28 21:26:58 2010
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj.examples.trading;
+
+import quickfix.Message;
+import quickfix.SessionID;
+
+public interface QuickfixjMessageListener {
+    void onMessage(SessionID sessionID, Message message) throws Exception;
+}

Added: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutor.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutor.java?rev=1002361&view=auto
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutor.java (added)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutor.java Tue Sep 28 21:26:58 2010
@@ -0,0 +1,431 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj.examples.trading;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import quickfix.ConfigError;
+import quickfix.DataDictionary;
+import quickfix.DataDictionaryProvider;
+import quickfix.FieldConvertError;
+import quickfix.FieldNotFound;
+import quickfix.FixVersions;
+import quickfix.IncorrectTagValue;
+import quickfix.LogUtil;
+import quickfix.Message;
+import quickfix.MessageUtils;
+import quickfix.Session;
+import quickfix.SessionID;
+import quickfix.SessionNotFound;
+import quickfix.UnsupportedMessageType;
+import quickfix.field.ApplVerID;
+import quickfix.field.AvgPx;
+import quickfix.field.CumQty;
+import quickfix.field.ExecID;
+import quickfix.field.ExecTransType;
+import quickfix.field.ExecType;
+import quickfix.field.LastPx;
+import quickfix.field.LastQty;
+import quickfix.field.LastShares;
+import quickfix.field.LeavesQty;
+import quickfix.field.OrdStatus;
+import quickfix.field.OrdType;
+import quickfix.field.OrderID;
+import quickfix.field.OrderQty;
+import quickfix.field.Price;
+import quickfix.field.Side;
+import quickfix.field.Symbol;
+
+/**
+ * Trade executor based on QFJ example "executor" (No Camel dependencies)
+ */
+public class TradeExecutor {
+    private static final Logger LOG = LoggerFactory.getLogger(TradeExecutor.class);
+    
+    private boolean alwaysFillLimitOrders;
+    private Set<String> validOrderTypes = new HashSet<String>();
+    private  MarketQuoteProvider marketQuoteProvider;
+
+    private List<QuickfixjMessageListener> listeners = new CopyOnWriteArrayList<QuickfixjMessageListener>();
+    
+    private int orderID;
+    private int execID;
+    
+    @SuppressWarnings("serial")
+    public TradeExecutor() throws ConfigError, FieldConvertError {
+        setAlwaysFillLimitOrders(true);
+        setValidOrderTypes(new HashSet<String>() { { add(OrdType.LIMIT + ""); add(OrdType.MARKET + ""); } });
+        setMarketQuoteProvider(new DefaultMarketQuoteProvider(10.00));
+    }
+    
+    public void setAlwaysFillLimitOrders(boolean alwaysFillLimitOrders) {
+        this.alwaysFillLimitOrders = alwaysFillLimitOrders;
+    }
+    
+    public void setMarketQuoteProvider(MarketQuoteProvider marketQuoteProvider) {
+        this.marketQuoteProvider = marketQuoteProvider;
+    }
+    
+    public void setValidOrderTypes(String validOrderTypes) {
+        setValidOrderTypes(new HashSet<String>(Arrays.asList(validOrderTypes.split("\\s*,\\s*"))));
+    }
+    
+    public void setValidOrderTypes(Set<String> validOrderTypes) {
+        this.validOrderTypes = validOrderTypes;
+    }
+    
+    public void addListener(QuickfixjMessageListener listener) {
+        listeners.add(listener);
+    }
+    
+    public void removeListener(QuickfixjMessageListener listener) {
+        listeners.remove(listener);
+    }
+    
+    public void execute(final Message message) throws FieldNotFound, UnsupportedMessageType, IncorrectTagValue {
+        final SessionID sessionID = MessageUtils.getSessionID(message);
+
+        try {
+            if (message instanceof quickfix.fix40.NewOrderSingle) {
+                onMessage((quickfix.fix40.NewOrderSingle) message, sessionID);
+            } else if (message instanceof quickfix.fix41.NewOrderSingle) {
+                onMessage((quickfix.fix41.NewOrderSingle) message, sessionID);
+            } else if (message instanceof quickfix.fix42.NewOrderSingle) {
+                onMessage((quickfix.fix42.NewOrderSingle) message, sessionID);
+            } else if (message instanceof quickfix.fix43.NewOrderSingle) {
+                onMessage((quickfix.fix43.NewOrderSingle) message, sessionID);
+            } else if (message instanceof quickfix.fix44.NewOrderSingle) {
+                onMessage((quickfix.fix44.NewOrderSingle) message, sessionID);
+            } else if (message instanceof quickfix.fix50.NewOrderSingle) {
+                onMessage((quickfix.fix50.NewOrderSingle) message, sessionID);
+            }
+        } catch (Exception e) {
+            LOG.error("Error submitting execution task", e);
+        }
+    }
+    
+    private void onMessage(quickfix.fix40.NewOrderSingle order, SessionID sessionID) throws FieldNotFound, UnsupportedMessageType, IncorrectTagValue {
+        try {
+            validateOrder(order);
+
+            OrderQty orderQty = order.getOrderQty();
+
+            Price price = getPrice(order);
+
+            quickfix.fix40.ExecutionReport accept = new quickfix.fix40.ExecutionReport(genOrderID(), genExecID(),
+                    new ExecTransType(ExecTransType.NEW), new OrdStatus(OrdStatus.NEW), order.getSymbol(), order.getSide(),
+                    orderQty, new LastShares(0), new LastPx(0), new CumQty(0), new AvgPx(0));
+
+            accept.set(order.getClOrdID());
+            sendMessage(sessionID, accept);
+
+            if (isOrderExecutable(order, price)) {
+                quickfix.fix40.ExecutionReport fill = new quickfix.fix40.ExecutionReport(genOrderID(), genExecID(),
+                        new ExecTransType(ExecTransType.NEW), new OrdStatus(OrdStatus.FILLED), order.getSymbol(), order
+                                .getSide(), orderQty, new LastShares(orderQty.getValue()), new LastPx(price.getValue()),
+                        new CumQty(orderQty.getValue()), new AvgPx(price.getValue()));
+
+                fill.set(order.getClOrdID());
+
+                sendMessage(sessionID, fill);
+            }
+        } catch (RuntimeException e) {
+            LogUtil.logThrowable(sessionID, e.getMessage(), e);
+        }
+    }
+
+    private boolean isOrderExecutable(Message order, Price price) throws FieldNotFound {
+        if (order.getChar(OrdType.FIELD) == OrdType.LIMIT) {
+            BigDecimal limitPrice = new BigDecimal(order.getString(Price.FIELD));
+            char side = order.getChar(Side.FIELD);
+            BigDecimal thePrice = new BigDecimal(Double.toString(price.getValue()));
+
+            return (side == Side.BUY && thePrice.compareTo(limitPrice) <= 0) 
+                || ((side == Side.SELL || side == Side.SELL_SHORT) && thePrice.compareTo(limitPrice) >= 0);
+        }
+        return true;
+    }
+
+    private Price getPrice(Message message) throws FieldNotFound {
+        Price price;
+        if (message.getChar(OrdType.FIELD) == OrdType.LIMIT && alwaysFillLimitOrders) {
+            price = new Price(message.getDouble(Price.FIELD));
+        } else {
+            if (marketQuoteProvider == null) {
+                throw new RuntimeException("No market data provider specified for market order");
+            }
+            char side = message.getChar(Side.FIELD);
+            if (side == Side.BUY) {
+                price = new Price(marketQuoteProvider.getAsk(message.getString(Symbol.FIELD)));
+            } else if (side == Side.SELL || side == Side.SELL_SHORT) {
+                price = new Price(marketQuoteProvider.getBid(message.getString(Symbol.FIELD)));
+            } else {
+                throw new RuntimeException("Invalid order side: " + side);
+            }
+        }
+        return price;
+    }
+
+    private void sendMessage(SessionID sessionID, Message message) {
+        try {
+            Session session = Session.lookupSession(sessionID);
+            if (session == null) {
+                throw new SessionNotFound(sessionID.toString());
+            }
+            
+            DataDictionaryProvider provider = session.getDataDictionaryProvider();
+            if (provider != null) {
+                try {
+                    ApplVerID applVerID = getApplVerID(session, message);
+                    DataDictionary appDataDictionary = provider.getApplicationDataDictionary(applVerID, null);
+                    appDataDictionary.validate(message, true);
+                } catch (Exception e) {
+                    LogUtil.logThrowable(sessionID, "Outgoing message failed validation: "
+                            + e.getMessage(), e);
+                    return;
+                }
+            }
+            
+            for (QuickfixjMessageListener listener : listeners) {
+                try {
+                    listener.onMessage(sessionID, message);
+                } catch (Throwable e) {
+                    LogUtil.logThrowable(sessionID, "Error while dispatching message", e);
+                }
+            }
+            
+        } catch (SessionNotFound e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    private ApplVerID getApplVerID(Session session, Message message) {
+        String beginString = session.getSessionID().getBeginString();
+        if (FixVersions.BEGINSTRING_FIXT11.equals(beginString)) {
+            return new ApplVerID(ApplVerID.FIX50);
+        } else {
+            return MessageUtils.toApplVerID(beginString);
+        }
+    }
+
+    private void onMessage(quickfix.fix41.NewOrderSingle order, SessionID sessionID) throws FieldNotFound, UnsupportedMessageType, IncorrectTagValue {
+        try {
+            validateOrder(order);
+    
+            OrderQty orderQty = order.getOrderQty();
+            Price price = getPrice(order);
+    
+            quickfix.fix41.ExecutionReport accept = new quickfix.fix41.ExecutionReport(genOrderID(), genExecID(),
+                    new ExecTransType(ExecTransType.NEW), new ExecType(ExecType.NEW), new OrdStatus(OrdStatus.NEW), order
+                            .getSymbol(), order.getSide(), orderQty, new LastShares(0), new LastPx(0), new LeavesQty(0),
+                    new CumQty(0), new AvgPx(0));
+    
+            accept.set(order.getClOrdID());
+            sendMessage(sessionID, accept);
+    
+            if (isOrderExecutable(order, price)) {
+                quickfix.fix41.ExecutionReport executionReport = new quickfix.fix41.ExecutionReport(genOrderID(),
+                        genExecID(), new ExecTransType(ExecTransType.NEW), new ExecType(ExecType.FILL), new OrdStatus(
+                                OrdStatus.FILLED), order.getSymbol(), order.getSide(), orderQty, new LastShares(orderQty
+                                .getValue()), new LastPx(price.getValue()), new LeavesQty(0), new CumQty(orderQty
+                                .getValue()), new AvgPx(price.getValue()));
+    
+                executionReport.set(order.getClOrdID());
+    
+                sendMessage(sessionID, executionReport);
+            }
+        } catch (RuntimeException e) {
+            LogUtil.logThrowable(sessionID, e.getMessage(), e);
+        }
+    }
+
+    private void onMessage(quickfix.fix42.NewOrderSingle order, SessionID sessionID) throws FieldNotFound, UnsupportedMessageType, IncorrectTagValue {
+        try {
+            validateOrder(order);
+    
+            OrderQty orderQty = order.getOrderQty();
+            Price price = getPrice(order);
+    
+            quickfix.fix42.ExecutionReport accept = new quickfix.fix42.ExecutionReport(genOrderID(), genExecID(),
+                    new ExecTransType(ExecTransType.NEW), new ExecType(ExecType.NEW), new OrdStatus(OrdStatus.NEW), order
+                            .getSymbol(), order.getSide(), new LeavesQty(0), new CumQty(0), new AvgPx(0));
+    
+            accept.set(order.getClOrdID());
+            sendMessage(sessionID, accept);
+    
+            if (isOrderExecutable(order, price)) {
+                quickfix.fix42.ExecutionReport executionReport = new quickfix.fix42.ExecutionReport(genOrderID(),
+                    genExecID(), new ExecTransType(ExecTransType.NEW), new ExecType(ExecType.FILL), 
+                    new OrdStatus(OrdStatus.FILLED), order.getSymbol(), order.getSide(), new LeavesQty(0), 
+                    new CumQty(orderQty.getValue()), new AvgPx(price.getValue()));
+    
+                executionReport.set(order.getClOrdID());
+                executionReport.set(orderQty);
+                executionReport.set(new LastShares(orderQty.getValue()));
+                executionReport.set(new LastPx(price.getValue()));
+    
+                sendMessage(sessionID, executionReport);
+            }
+        } catch (RuntimeException e) {
+            LogUtil.logThrowable(sessionID, e.getMessage(), e);
+        }
+    }
+
+    private void validateOrder(Message order) throws IncorrectTagValue, FieldNotFound {
+        OrdType ordType = new OrdType(order.getChar(OrdType.FIELD));
+        if (!validOrderTypes.contains(Character.toString(ordType.getValue()))) {
+            LOG.error("Order type not in ValidOrderTypes setting");
+            throw new IncorrectTagValue(ordType.getField());
+        }
+        if (ordType.getValue() == OrdType.MARKET && marketQuoteProvider == null) {
+            LOG.error("DefaultMarketPrice setting not specified for market order");
+            throw new IncorrectTagValue(ordType.getField());
+        }
+    }
+
+    private void onMessage(quickfix.fix43.NewOrderSingle order, SessionID sessionID) throws FieldNotFound, UnsupportedMessageType, IncorrectTagValue {
+        try {
+            validateOrder(order);
+    
+            OrderQty orderQty = order.getOrderQty();
+            Price price = getPrice(order);
+    
+            quickfix.fix43.ExecutionReport accept = new quickfix.fix43.ExecutionReport(
+                genOrderID(), genExecID(), new ExecType(ExecType.NEW), new OrdStatus(OrdStatus.NEW), 
+                order.getSide(), new LeavesQty(order.getOrderQty().getValue()), new CumQty(0), new AvgPx(0));
+    
+            accept.set(order.getClOrdID());
+            accept.set(order.getSymbol());
+            sendMessage(sessionID, accept);
+    
+            if (isOrderExecutable(order, price)) {
+                quickfix.fix43.ExecutionReport executionReport = new quickfix.fix43.ExecutionReport(genOrderID(),
+                    genExecID(), new ExecType(ExecType.FILL), new OrdStatus(OrdStatus.FILLED), order.getSide(),
+                    new LeavesQty(0), new CumQty(orderQty.getValue()), new AvgPx(price.getValue()));
+    
+                executionReport.set(order.getClOrdID());
+                executionReport.set(order.getSymbol());
+                executionReport.set(orderQty);
+                executionReport.set(new LastQty(orderQty.getValue()));
+                executionReport.set(new LastPx(price.getValue()));
+    
+                sendMessage(sessionID, executionReport);
+            }
+        } catch (RuntimeException e) {
+            LogUtil.logThrowable(sessionID, e.getMessage(), e);
+        }
+    }
+
+    private void onMessage(quickfix.fix44.NewOrderSingle order, SessionID sessionID) throws FieldNotFound, UnsupportedMessageType, IncorrectTagValue {
+        try {
+            validateOrder(order);
+    
+            OrderQty orderQty = order.getOrderQty();
+            Price price = getPrice(order);
+    
+            quickfix.fix44.ExecutionReport accept = new quickfix.fix44.ExecutionReport(
+                genOrderID(), genExecID(), new ExecType(ExecType.NEW), new OrdStatus(OrdStatus.NEW), 
+                order.getSide(), new LeavesQty(order.getOrderQty().getValue()), new CumQty(0), new AvgPx(0));
+    
+            accept.set(order.getClOrdID());
+            accept.set(order.getSymbol());
+            sendMessage(sessionID, accept);
+    
+            if (isOrderExecutable(order, price)) {
+                quickfix.fix44.ExecutionReport executionReport = new quickfix.fix44.ExecutionReport(genOrderID(),
+                    genExecID(), new ExecType(ExecType.FILL), new OrdStatus(OrdStatus.FILLED), order.getSide(),
+                    new LeavesQty(0), new CumQty(orderQty.getValue()), new AvgPx(price.getValue()));
+    
+                executionReport.set(order.getClOrdID());
+                executionReport.set(order.getSymbol());
+                executionReport.set(orderQty);
+                executionReport.set(new LastQty(orderQty.getValue()));
+                executionReport.set(new LastPx(price.getValue()));
+    
+                sendMessage(sessionID, executionReport);
+            }
+        } catch (RuntimeException e) {
+            LogUtil.logThrowable(sessionID, e.getMessage(), e);
+        }
+    }
+
+    private void onMessage(quickfix.fix50.NewOrderSingle order, SessionID sessionID) throws FieldNotFound, UnsupportedMessageType, IncorrectTagValue {
+        try {
+            validateOrder(order);
+
+            OrderQty orderQty = order.getOrderQty();
+            Price price = getPrice(order);
+
+            quickfix.fix50.ExecutionReport accept = new quickfix.fix50.ExecutionReport(
+                genOrderID(), genExecID(), new ExecType(ExecType.NEW), new OrdStatus(OrdStatus.NEW), 
+                order.getSide(), new LeavesQty(order.getOrderQty().getValue()), new CumQty(0));
+
+            accept.set(order.getClOrdID());
+            accept.set(order.getSymbol());
+            sendMessage(sessionID, accept);
+
+            if (isOrderExecutable(order, price)) {
+                quickfix.fix50.ExecutionReport executionReport = new quickfix.fix50.ExecutionReport(
+                    genOrderID(), genExecID(), new ExecType(ExecType.FILL), new OrdStatus(OrdStatus.FILLED), 
+                    order.getSide(), new LeavesQty(0), new CumQty(orderQty.getValue()));
+
+                executionReport.set(order.getClOrdID());
+                executionReport.set(order.getSymbol());
+                executionReport.set(orderQty);
+                executionReport.set(new LastQty(orderQty.getValue()));
+                executionReport.set(new LastPx(price.getValue()));
+                executionReport.set(new AvgPx(price.getValue()));
+                
+                sendMessage(sessionID, executionReport);
+            }
+        } catch (RuntimeException e) {
+            LogUtil.logThrowable(sessionID, e.getMessage(), e);
+        }
+    }
+    
+    public OrderID genOrderID() {
+        return new OrderID(Integer.valueOf(++orderID).toString());
+    }
+
+    public ExecID genExecID() {
+        return new ExecID(Integer.valueOf(++execID).toString());
+    }
+
+    private static class DefaultMarketQuoteProvider implements MarketQuoteProvider {
+        private double defaultMarketPrice;
+        
+        public DefaultMarketQuoteProvider(double defaultMarketPrice) {
+            this.defaultMarketPrice = defaultMarketPrice;
+        }
+
+        public double getAsk(String symbol) {
+            return defaultMarketPrice;
+        }
+
+        public double getBid(String symbol) {
+            return defaultMarketPrice;
+        }
+    };
+}
\ No newline at end of file