You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/10/14 21:38:13 UTC
svn commit: r1022678 - in /camel/trunk/components/camel-quickfix/src:
main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
test/java/org/apache/camel/component/quickfixj/QuickfixjEngineTest.java
Author: davsclaus
Date: Thu Oct 14 19:38:12 2010
New Revision: 1022678
URL: http://svn.apache.org/viewvc?rev=1022678&view=rev
Log:
CAMEL-3231: Applied patch with thanks to Beat Glattfelder.
Modified:
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjEngineTest.java
Modified: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java?rev=1022678&r1=1022677&r2=1022678&view=diff
==============================================================================
--- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java (original)
+++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java Thu Oct 14 19:38:12 2010
@@ -86,6 +86,7 @@ public class QuickfixjEngine {
private final Acceptor acceptor;
private final Initiator initiator;
+ private final JmxExporter jmxExporter;
private final boolean forcedShutdown;
private final MessageStoreFactory messageStoreFactory;
private final LogFactory sessionLogFactory;
@@ -133,10 +134,11 @@ public class QuickfixjEngine {
threadModel = ThreadModel.valueOf(settings.getString(SETTING_THREAD_MODEL));
}
- JmxExporter jmxExporter = null;
if (settings.isSetting(SETTING_USE_JMX) && settings.getBool(SETTING_USE_JMX)) {
LOG.info("Enabling JMX for QuickFIX/J");
jmxExporter = new JmxExporter();
+ } else {
+ jmxExporter = null;
}
// From original component implementation...
@@ -146,24 +148,17 @@ public class QuickfixjEngine {
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+
if (isConnectorRole(settings, SessionFactory.ACCEPTOR_CONNECTION_TYPE)) {
- acceptor = createAcceptor(new Dispatcher(), settings,
+ acceptor = createAcceptor(new Dispatcher(), settings,
messageStoreFactory, sessionLogFactory, messageFactory, threadModel);
-
- if (jmxExporter != null) {
- jmxExporter.export(acceptor);
- }
} else {
acceptor = null;
}
if (isConnectorRole(settings, SessionFactory.INITIATOR_CONNECTION_TYPE)) {
initiator = createInitiator(new Dispatcher(), settings,
- messageStoreFactory, sessionLogFactory, messageFactory, threadModel);
-
- if (jmxExporter != null) {
- jmxExporter.export(initiator);
- }
+ messageStoreFactory, sessionLogFactory, messageFactory, threadModel);
} else {
initiator = null;
}
@@ -179,9 +174,15 @@ public class QuickfixjEngine {
public void start() throws Exception {
if (acceptor != null) {
acceptor.start();
+ if (jmxExporter != null) {
+ jmxExporter.export(acceptor);
+ }
}
if (initiator != null) {
initiator.start();
+ if (jmxExporter != null) {
+ jmxExporter.export(initiator);
+ }
}
started = true;
}
Modified: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjEngineTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjEngineTest.java?rev=1022678&r1=1022677&r2=1022678&view=diff
==============================================================================
--- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjEngineTest.java (original)
+++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjEngineTest.java Thu Oct 14 19:38:12 2010
@@ -1,539 +1,539 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.quickfixj;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.management.JMException;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
-import org.apache.mina.common.TransportType;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import quickfix.Acceptor;
-import quickfix.ConfigError;
-import quickfix.DefaultMessageFactory;
-import quickfix.FieldConvertError;
-import quickfix.FieldNotFound;
-import quickfix.FileLogFactory;
-import quickfix.FileStoreFactory;
-import quickfix.FixVersions;
-import quickfix.Initiator;
-import quickfix.JdbcLogFactory;
-import quickfix.JdbcSetting;
-import quickfix.JdbcStoreFactory;
-import quickfix.MemoryStoreFactory;
-import quickfix.Message;
-import quickfix.ScreenLogFactory;
-import quickfix.Session;
-import quickfix.SessionFactory;
-import quickfix.SessionID;
-import quickfix.SessionNotFound;
-import quickfix.SessionSettings;
-import quickfix.SleepycatStoreFactory;
-import quickfix.SocketAcceptor;
-import quickfix.SocketInitiator;
-import quickfix.ThreadedSocketAcceptor;
-import quickfix.ThreadedSocketInitiator;
-import quickfix.field.MsgType;
-import quickfix.fix42.Email;
-
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.CoreMatchers.nullValue;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-
-public class QuickfixjEngineTest {
- private File settingsFile;
- private ClassLoader contextClassLoader;
- private SessionSettings settings;
- private SessionID sessionID;
- private File tempdir;
- private QuickfixjEngine quickfixjEngine;
-
- @Before
- public void setUp() throws Exception {
- settingsFile = File.createTempFile("quickfixj_test_", ".cfg");
- tempdir = settingsFile.getParentFile();
- URL[] urls = new URL[] {tempdir.toURI().toURL()};
-
- contextClassLoader = Thread.currentThread().getContextClassLoader();
- ClassLoader testClassLoader = new URLClassLoader(urls, contextClassLoader);
- Thread.currentThread().setContextClassLoader(testClassLoader);
-
- sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "FOO", "BAR");
-
- settings = new SessionSettings();
- settings.setString(Acceptor.SETTING_SOCKET_ACCEPT_PROTOCOL, TransportType.VM_PIPE.toString());
- settings.setString(Initiator.SETTING_SOCKET_CONNECT_PROTOCOL, TransportType.VM_PIPE.toString());
- settings.setBool(Session.SETTING_USE_DATA_DICTIONARY, false);
- TestSupport.setSessionID(settings, sessionID);
- }
-
- @After
- public void tearDown() throws Exception {
- Thread.currentThread().setContextClassLoader(contextClassLoader);
- if (quickfixjEngine != null) {
- quickfixjEngine.stop();
- }
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void missingSettingsResource() throws Exception {
- new QuickfixjEngine("bogus.cfg", false);
- }
-
- @Test
- public void defaultInitiator() throws Exception {
- settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
-
- writeSettings();
-
- quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
-
- assertThat(quickfixjEngine.getInitiator(), instanceOf(SocketInitiator.class));
- assertThat(quickfixjEngine.getAcceptor(), nullValue());
- assertDefaultConfiguration(quickfixjEngine);
- }
-
- @Test
- public void threadPerSessionInitiator() throws Exception {
- settings.setString(QuickfixjEngine.SETTING_THREAD_MODEL, QuickfixjEngine.ThreadModel.ThreadPerSession.toString());
- settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
-
- writeSettings();
-
- quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
-
- assertThat(quickfixjEngine.getInitiator(), instanceOf(ThreadedSocketInitiator.class));
- assertThat(quickfixjEngine.getAcceptor(), nullValue());
- assertDefaultConfiguration(quickfixjEngine);
- }
-
- @Test
- public void defaultAcceptor() throws Exception {
- settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.ACCEPTOR_CONNECTION_TYPE);
- settings.setLong(sessionID, Acceptor.SETTING_SOCKET_ACCEPT_PORT, 1234);
-
- writeSettings();
-
- quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
-
- assertThat(quickfixjEngine.getInitiator(), nullValue());
- assertThat(quickfixjEngine.getAcceptor(), instanceOf(SocketAcceptor.class));
- assertDefaultConfiguration(quickfixjEngine);
- }
-
- @Test
- public void threadPerSessionAcceptor() throws Exception {
- settings.setString(QuickfixjEngine.SETTING_THREAD_MODEL, QuickfixjEngine.ThreadModel.ThreadPerSession.toString());
- settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.ACCEPTOR_CONNECTION_TYPE);
- settings.setLong(sessionID, Acceptor.SETTING_SOCKET_ACCEPT_PORT, 1234);
-
- writeSettings();
-
- quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
-
- assertThat(quickfixjEngine.getInitiator(), nullValue());
- assertThat(quickfixjEngine.getAcceptor(), instanceOf(ThreadedSocketAcceptor.class));
- assertDefaultConfiguration(quickfixjEngine);
- }
-
- @Test
- public void minimalInitiatorAndAcceptor() throws Exception {
- settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.ACCEPTOR_CONNECTION_TYPE);
- settings.setLong(sessionID, Acceptor.SETTING_SOCKET_ACCEPT_PORT, 1234);
-
- SessionID initiatorSessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "FARGLE", "BARGLE");
- settings.setString(initiatorSessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
- TestSupport.setSessionID(settings, initiatorSessionID);
-
- writeSettings();
-
- quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
-
- assertThat(quickfixjEngine.getInitiator(), notNullValue());
- assertThat(quickfixjEngine.getAcceptor(), notNullValue());
- assertDefaultConfiguration(quickfixjEngine);
- }
-
- @Test
- public void inferFileStore() throws Exception {
- settings.setString(FileStoreFactory.SETTING_FILE_STORE_PATH, tempdir.toString());
- settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
-
- writeSettings();
-
- quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
-
- assertThat(quickfixjEngine.getInitiator(), notNullValue());
- assertThat(quickfixjEngine.getAcceptor(), nullValue());
- assertThat(quickfixjEngine.getSettingsResourceName(), is(settingsFile.getName()));
- assertThat(quickfixjEngine.getMessageStoreFactory(), instanceOf(FileStoreFactory.class));
- assertThat(quickfixjEngine.getLogFactory(), instanceOf(ScreenLogFactory.class));
- assertThat(quickfixjEngine.getMessageFactory(), instanceOf(DefaultMessageFactory.class));
- }
-
- // NOTE This is a little strange. If the JDBC driver is set and no log settings are found,
- // then we use JDBC for both the message store and the log.
-
- @Test
- public void inferJdbcStoreAndLog() throws Exception {
- settings.setString(JdbcSetting.SETTING_JDBC_DRIVER, "driver");
- settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
-
- writeSettings();
-
- quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
-
- assertThat(quickfixjEngine.getInitiator(), notNullValue());
- assertThat(quickfixjEngine.getAcceptor(), nullValue());
- assertThat(quickfixjEngine.getSettingsResourceName(), is(settingsFile.getName()));
- assertThat(quickfixjEngine.getMessageStoreFactory(), instanceOf(JdbcStoreFactory.class));
- assertThat(quickfixjEngine.getLogFactory(), instanceOf(JdbcLogFactory.class));
- assertThat(quickfixjEngine.getMessageFactory(), instanceOf(DefaultMessageFactory.class));
- }
-
- @Test
- public void ambiguousMessageStore() throws Exception {
- settings.setString(FileStoreFactory.SETTING_FILE_STORE_PATH, tempdir.toString());
- settings.setString(JdbcSetting.SETTING_JDBC_DRIVER, "driver");
- settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
-
- writeSettings();
-
- doAmbiguityTest("Ambiguous message store");
- }
-
- @Test
- public void inferJdbcStoreWithInferredLog() throws Exception {
- settings.setString(JdbcSetting.SETTING_JDBC_DRIVER, "driver");
- settings.setBool(ScreenLogFactory.SETTING_LOG_EVENTS, true);
- settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
-
- writeSettings();
-
- quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
-
- assertThat(quickfixjEngine.getInitiator(), notNullValue());
- assertThat(quickfixjEngine.getAcceptor(), nullValue());
- assertThat(quickfixjEngine.getSettingsResourceName(), is(settingsFile.getName()));
- assertThat(quickfixjEngine.getMessageStoreFactory(), instanceOf(JdbcStoreFactory.class));
- assertThat(quickfixjEngine.getLogFactory(), instanceOf(ScreenLogFactory.class));
- assertThat(quickfixjEngine.getMessageFactory(), instanceOf(DefaultMessageFactory.class));
- }
-
- @Test
- public void inferSleepycatStore() throws Exception {
- settings.setString(SleepycatStoreFactory.SETTING_SLEEPYCAT_DATABASE_DIR, tempdir.toString());
- settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
-
- writeSettings();
-
- quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
-
- assertThat(quickfixjEngine.getInitiator(), notNullValue());
- assertThat(quickfixjEngine.getAcceptor(), nullValue());
- assertThat(quickfixjEngine.getSettingsResourceName(), is(settingsFile.getName()));
- assertThat(quickfixjEngine.getMessageStoreFactory(), instanceOf(SleepycatStoreFactory.class));
- assertThat(quickfixjEngine.getLogFactory(), instanceOf(ScreenLogFactory.class));
- assertThat(quickfixjEngine.getMessageFactory(), instanceOf(DefaultMessageFactory.class));
- }
-
- @Test
- public void inferFileLog() throws Exception {
- settings.setString(FileLogFactory.SETTING_FILE_LOG_PATH, tempdir.toString());
- settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
-
- writeSettings();
-
- quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
-
- assertThat(quickfixjEngine.getInitiator(), notNullValue());
- assertThat(quickfixjEngine.getAcceptor(), nullValue());
- assertThat(quickfixjEngine.getSettingsResourceName(), is(settingsFile.getName()));
- assertThat(quickfixjEngine.getMessageStoreFactory(), instanceOf(MemoryStoreFactory.class));
- assertThat(quickfixjEngine.getLogFactory(), instanceOf(FileLogFactory.class));
- assertThat(quickfixjEngine.getMessageFactory(), instanceOf(DefaultMessageFactory.class));
- }
-
- @Test
- public void ambiguousLog() throws Exception {
- settings.setString(FileLogFactory.SETTING_FILE_LOG_PATH, tempdir.toString());
- settings.setBool(ScreenLogFactory.SETTING_LOG_EVENTS, true);
- settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
-
- writeSettings();
-
- doAmbiguityTest("Ambiguous log");
- }
-
- private void doAmbiguityTest(String exceptionText) throws FieldConvertError, IOException, JMException {
- try {
- quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
- fail("Expected exception, but none raised");
- } catch (ConfigError e) {
- assertTrue(e.getMessage().contains(exceptionText));
- }
- }
-
- @Test
- public void enableJmxForInitiator() throws Exception {
- settings.setBool(QuickfixjEngine.SETTING_USE_JMX, true);
- settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
-
- writeSettings();
-
- quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
-
- MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
- Set<ObjectName> n = mbeanServer.queryNames(new ObjectName("org.quickfixj:type=Connector,role=Initiator,*"), null);
- assertFalse("QFJ mbean not registered", n.isEmpty());
- }
-
- @Test
- public void enableJmxForAcceptor() throws Exception {
- settings.setBool(QuickfixjEngine.SETTING_USE_JMX, true);
- settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.ACCEPTOR_CONNECTION_TYPE);
- settings.setLong(sessionID, Acceptor.SETTING_SOCKET_ACCEPT_PORT, 1234);
-
- writeSettings();
-
- quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
-
- MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
- Set<ObjectName> n = mbeanServer.queryNames(new ObjectName("org.quickfixj:type=Connector,role=Acceptor,*"), null);
- assertFalse("QFJ mbean not registered", n.isEmpty());
- }
-
- @Test
- public void sessionEvents() throws Exception {
- SessionID acceptorSessionID = new SessionID(FixVersions.BEGINSTRING_FIX42, "MARKET", "TRADER");
- SessionID initiatorSessionID = new SessionID(FixVersions.BEGINSTRING_FIX42, "TRADER", "MARKET");
-
- quickfixjEngine = new QuickfixjEngine("examples/inprocess.cfg", false);
-
- doLogonEventsTest(acceptorSessionID, initiatorSessionID, quickfixjEngine);
-
- doApplicationMessageEventsTest(acceptorSessionID, initiatorSessionID, quickfixjEngine);
-
- doLogoffEventsTest(acceptorSessionID, initiatorSessionID, quickfixjEngine);
- }
-
- private void doLogonEventsTest(SessionID acceptorSessionID, SessionID initiatorSessionID, final QuickfixjEngine quickfixjEngine)
- throws Exception, InterruptedException {
-
- final List<EventRecord> events = new ArrayList<EventRecord>();
- final CountDownLatch logonLatch = new CountDownLatch(2);
-
- QuickfixjEventListener logonListener = new QuickfixjEventListener() {
- public void onEvent(QuickfixjEventCategory eventCategory,
- SessionID sessionID, Message message) {
- events.add(new EventRecord(eventCategory, sessionID, message));
- if (eventCategory == QuickfixjEventCategory.SessionLogon) {
- logonLatch.countDown();
- }
- }
- };
-
- quickfixjEngine.addEventListener(logonListener);
-
- quickfixjEngine.start();
-
- assertTrue("Logons not completed", logonLatch.await(5000, TimeUnit.MILLISECONDS));
- quickfixjEngine.removeEventListener(logonListener);
-
- assertThat(events.size(), is(7));
-
- int n = 0;
-
- assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.SessionCreated));
- assertThat(events.get(n).getSessionID(), is(initiatorSessionID));
- assertThat(events.get(n).getMessage(), is(nullValue()));
- n++;
-
- assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.AdminMessageSent));
- assertThat(events.get(n).getSessionID(), is(initiatorSessionID));
- assertThat(events.get(n).getMessage(), is(notNullValue()));
- n++;
-
- assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.AdminMessageReceived));
- assertThat(events.get(n).getSessionID(), is(acceptorSessionID));
- assertThat(events.get(n).getMessage(), is(notNullValue()));
- n++;
-
- assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.AdminMessageSent));
- assertThat(events.get(n).getSessionID(), is(acceptorSessionID));
- assertThat(events.get(n).getMessage(), is(notNullValue()));
- n++;
-
- assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.SessionLogon));
- assertThat(events.get(n).getSessionID(), is(acceptorSessionID));
- assertThat(events.get(n).getMessage(), is(nullValue()));
- n++;
-
- assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.AdminMessageReceived));
- assertThat(events.get(n).getSessionID(), is(initiatorSessionID));
- assertThat(events.get(n).getMessage(), is(notNullValue()));
- n++;
-
- assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.SessionLogon));
- assertThat(events.get(n).getSessionID(), is(initiatorSessionID));
- assertThat(events.get(n).getMessage(), is(nullValue()));
- n++;
- }
-
- private void doApplicationMessageEventsTest(SessionID acceptorSessionID, SessionID initiatorSessionID, final QuickfixjEngine quickfixjEngine)
- throws SessionNotFound, InterruptedException, FieldNotFound {
-
- final List<EventRecord> events = new ArrayList<EventRecord>();
- final CountDownLatch messageLatch = new CountDownLatch(1);
-
- QuickfixjEventListener messageListener = new QuickfixjEventListener() {
- public void onEvent(QuickfixjEventCategory eventCategory,
- SessionID sessionID, Message message) {
- EventRecord event = new EventRecord(eventCategory, sessionID, message);
- events.add(event);
- if (eventCategory == QuickfixjEventCategory.AppMessageReceived) {
- messageLatch.countDown();
- }
- }
- };
-
- quickfixjEngine.addEventListener(messageListener);
- Email email = TestSupport.createEmailMessage("Test");
- Session.sendToTarget(email, initiatorSessionID);
-
- assertTrue("Application message not received", messageLatch.await(5000, TimeUnit.MILLISECONDS));
- quickfixjEngine.removeEventListener(messageListener);
-
- assertThat(events.size(), is(2));
-
- int n = 0;
-
- assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.AppMessageSent));
- assertThat(events.get(n).getSessionID(), is(initiatorSessionID));
- assertThat(events.get(n).getMessage().getHeader().getString(MsgType.FIELD), is(MsgType.EMAIL));
- n++;
-
- assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.AppMessageReceived));
- assertThat(events.get(n).getSessionID(), is(acceptorSessionID));
- assertThat(events.get(n).getMessage().getHeader().getString(MsgType.FIELD), is(MsgType.EMAIL));
- n++;
- }
-
- private void doLogoffEventsTest(SessionID acceptorSessionID, SessionID initiatorSessionID, final QuickfixjEngine quickfixjEngine)
- throws Exception, InterruptedException {
-
- final List<EventRecord> events = new ArrayList<EventRecord>();
- final CountDownLatch logoffLatch = new CountDownLatch(2);
-
- QuickfixjEventListener logoffListener = new QuickfixjEventListener() {
- public void onEvent(QuickfixjEventCategory eventCategory,
- SessionID sessionID, Message message) {
- EventRecord event = new EventRecord(eventCategory, sessionID, message);
- events.add(event);
- if (eventCategory == QuickfixjEventCategory.SessionLogoff) {
- logoffLatch.countDown();
- }
- }
- };
-
- quickfixjEngine.addEventListener(logoffListener);
-
- quickfixjEngine.stop();
-
- assertTrue("Logoffs not received", logoffLatch.await(5000, TimeUnit.MILLISECONDS));
- quickfixjEngine.removeEventListener(logoffListener);
-
- int n = 0;
-
- assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.SessionLogoff));
- assertThat(events.get(n).getSessionID(), is(acceptorSessionID));
- assertThat(events.get(n).getMessage(), is(nullValue()));
- n++;
-
- assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.SessionLogoff));
- assertThat(events.get(n).getSessionID(), is(initiatorSessionID));
- assertThat(events.get(n).getMessage(), is(nullValue()));
- n++;
- }
-
- private class EventRecord {
- private final QuickfixjEventCategory eventCategory;
- private final SessionID sessionID;
- private final Message message;
-
- public EventRecord(QuickfixjEventCategory eventCategory,
- SessionID sessionID, Message message) {
- super();
- this.eventCategory = eventCategory;
- this.sessionID = sessionID;
- this.message = message;
- }
-
- public QuickfixjEventCategory getEventCategory() {
- return eventCategory;
- }
-
- public SessionID getSessionID() {
- return sessionID;
- }
-
- public Message getMessage() {
- return message;
- }
-
- @Override
- public String toString() {
- return "EventRecord [eventCategory=" + eventCategory
- + ", sessionID=" + sessionID + ", message=" + message + "]";
- }
- }
-
- private void assertDefaultConfiguration(QuickfixjEngine quickfixjEngine) throws Exception {
- assertThat(quickfixjEngine.getSettingsResourceName(), is(settingsFile.getName()));
- assertThat(quickfixjEngine.getMessageStoreFactory(), instanceOf(MemoryStoreFactory.class));
- assertThat(quickfixjEngine.getLogFactory(), instanceOf(ScreenLogFactory.class));
- assertThat(quickfixjEngine.getMessageFactory(), instanceOf(DefaultMessageFactory.class));
- MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
- Set<ObjectName> names = mbeanServer.queryNames(new ObjectName("org.quickfixj:*"), null);
- assertTrue("QFJ mbean should not not registered", names.isEmpty());
- }
-
- private void writeSettings() throws IOException {
- TestSupport.writeSettings(settings, settingsFile);
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.management.JMException;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.mina.common.TransportType;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import quickfix.Acceptor;
+import quickfix.ConfigError;
+import quickfix.DefaultMessageFactory;
+import quickfix.FieldConvertError;
+import quickfix.FieldNotFound;
+import quickfix.FileLogFactory;
+import quickfix.FileStoreFactory;
+import quickfix.FixVersions;
+import quickfix.Initiator;
+import quickfix.JdbcLogFactory;
+import quickfix.JdbcSetting;
+import quickfix.JdbcStoreFactory;
+import quickfix.MemoryStoreFactory;
+import quickfix.Message;
+import quickfix.ScreenLogFactory;
+import quickfix.Session;
+import quickfix.SessionFactory;
+import quickfix.SessionID;
+import quickfix.SessionNotFound;
+import quickfix.SessionSettings;
+import quickfix.SleepycatStoreFactory;
+import quickfix.SocketAcceptor;
+import quickfix.SocketInitiator;
+import quickfix.ThreadedSocketAcceptor;
+import quickfix.ThreadedSocketInitiator;
+import quickfix.field.MsgType;
+import quickfix.fix42.Email;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+
+public class QuickfixjEngineTest {
+ private File settingsFile;
+ private ClassLoader contextClassLoader;
+ private SessionSettings settings;
+ private SessionID sessionID;
+ private File tempdir;
+ private QuickfixjEngine quickfixjEngine;
+
+ @Before
+ public void setUp() throws Exception {
+ settingsFile = File.createTempFile("quickfixj_test_", ".cfg");
+ tempdir = settingsFile.getParentFile();
+ URL[] urls = new URL[]{tempdir.toURI().toURL()};
+
+ contextClassLoader = Thread.currentThread().getContextClassLoader();
+ ClassLoader testClassLoader = new URLClassLoader(urls, contextClassLoader);
+ Thread.currentThread().setContextClassLoader(testClassLoader);
+
+ sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "FOO", "BAR");
+
+ settings = new SessionSettings();
+ settings.setString(Acceptor.SETTING_SOCKET_ACCEPT_PROTOCOL, TransportType.VM_PIPE.toString());
+ settings.setString(Initiator.SETTING_SOCKET_CONNECT_PROTOCOL, TransportType.VM_PIPE.toString());
+ settings.setBool(Session.SETTING_USE_DATA_DICTIONARY, false);
+ TestSupport.setSessionID(settings, sessionID);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ if (quickfixjEngine != null) {
+ quickfixjEngine.stop();
+ }
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void missingSettingsResource() throws Exception {
+ new QuickfixjEngine("bogus.cfg", false);
+ }
+
+ @Test
+ public void defaultInitiator() throws Exception {
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+
+ writeSettings();
+
+ quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+
+ assertThat(quickfixjEngine.getInitiator(), instanceOf(SocketInitiator.class));
+ assertThat(quickfixjEngine.getAcceptor(), nullValue());
+ assertDefaultConfiguration(quickfixjEngine);
+ }
+
+ @Test
+ public void threadPerSessionInitiator() throws Exception {
+ settings.setString(QuickfixjEngine.SETTING_THREAD_MODEL, QuickfixjEngine.ThreadModel.ThreadPerSession.toString());
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+
+ writeSettings();
+
+ quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+
+ assertThat(quickfixjEngine.getInitiator(), instanceOf(ThreadedSocketInitiator.class));
+ assertThat(quickfixjEngine.getAcceptor(), nullValue());
+ assertDefaultConfiguration(quickfixjEngine);
+ }
+
+ @Test
+ public void defaultAcceptor() throws Exception {
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.ACCEPTOR_CONNECTION_TYPE);
+ settings.setLong(sessionID, Acceptor.SETTING_SOCKET_ACCEPT_PORT, 1234);
+
+ writeSettings();
+
+ quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+
+ assertThat(quickfixjEngine.getInitiator(), nullValue());
+ assertThat(quickfixjEngine.getAcceptor(), instanceOf(SocketAcceptor.class));
+ assertDefaultConfiguration(quickfixjEngine);
+ }
+
+ @Test
+ public void threadPerSessionAcceptor() throws Exception {
+ settings.setString(QuickfixjEngine.SETTING_THREAD_MODEL, QuickfixjEngine.ThreadModel.ThreadPerSession.toString());
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.ACCEPTOR_CONNECTION_TYPE);
+ settings.setLong(sessionID, Acceptor.SETTING_SOCKET_ACCEPT_PORT, 1234);
+
+ writeSettings();
+
+ quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+
+ assertThat(quickfixjEngine.getInitiator(), nullValue());
+ assertThat(quickfixjEngine.getAcceptor(), instanceOf(ThreadedSocketAcceptor.class));
+ assertDefaultConfiguration(quickfixjEngine);
+ }
+
+ @Test
+ public void minimalInitiatorAndAcceptor() throws Exception {
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.ACCEPTOR_CONNECTION_TYPE);
+ settings.setLong(sessionID, Acceptor.SETTING_SOCKET_ACCEPT_PORT, 1234);
+
+ SessionID initiatorSessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "FARGLE", "BARGLE");
+ settings.setString(initiatorSessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+ TestSupport.setSessionID(settings, initiatorSessionID);
+
+ writeSettings();
+
+ quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+
+ assertThat(quickfixjEngine.getInitiator(), notNullValue());
+ assertThat(quickfixjEngine.getAcceptor(), notNullValue());
+ assertDefaultConfiguration(quickfixjEngine);
+ }
+
+ @Test
+ public void inferFileStore() throws Exception {
+ settings.setString(FileStoreFactory.SETTING_FILE_STORE_PATH, tempdir.toString());
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+
+ writeSettings();
+
+ quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+
+ assertThat(quickfixjEngine.getInitiator(), notNullValue());
+ assertThat(quickfixjEngine.getAcceptor(), nullValue());
+ assertThat(quickfixjEngine.getSettingsResourceName(), is(settingsFile.getName()));
+ assertThat(quickfixjEngine.getMessageStoreFactory(), instanceOf(FileStoreFactory.class));
+ assertThat(quickfixjEngine.getLogFactory(), instanceOf(ScreenLogFactory.class));
+ assertThat(quickfixjEngine.getMessageFactory(), instanceOf(DefaultMessageFactory.class));
+ }
+
+ // NOTE This is a little strange. If the JDBC driver is set and no log settings are found,
+ // then we use JDBC for both the message store and the log.
+
+ @Test
+ public void inferJdbcStoreAndLog() throws Exception {
+ settings.setString(JdbcSetting.SETTING_JDBC_DRIVER, "driver");
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+
+ writeSettings();
+
+ quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+
+ assertThat(quickfixjEngine.getInitiator(), notNullValue());
+ assertThat(quickfixjEngine.getAcceptor(), nullValue());
+ assertThat(quickfixjEngine.getSettingsResourceName(), is(settingsFile.getName()));
+ assertThat(quickfixjEngine.getMessageStoreFactory(), instanceOf(JdbcStoreFactory.class));
+ assertThat(quickfixjEngine.getLogFactory(), instanceOf(JdbcLogFactory.class));
+ assertThat(quickfixjEngine.getMessageFactory(), instanceOf(DefaultMessageFactory.class));
+ }
+
+ @Test
+ public void ambiguousMessageStore() throws Exception {
+ settings.setString(FileStoreFactory.SETTING_FILE_STORE_PATH, tempdir.toString());
+ settings.setString(JdbcSetting.SETTING_JDBC_DRIVER, "driver");
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+
+ writeSettings();
+
+ doAmbiguityTest("Ambiguous message store");
+ }
+
+ @Test
+ public void inferJdbcStoreWithInferredLog() throws Exception {
+ settings.setString(JdbcSetting.SETTING_JDBC_DRIVER, "driver");
+ settings.setBool(ScreenLogFactory.SETTING_LOG_EVENTS, true);
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+
+ writeSettings();
+
+ quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+
+ assertThat(quickfixjEngine.getInitiator(), notNullValue());
+ assertThat(quickfixjEngine.getAcceptor(), nullValue());
+ assertThat(quickfixjEngine.getSettingsResourceName(), is(settingsFile.getName()));
+ assertThat(quickfixjEngine.getMessageStoreFactory(), instanceOf(JdbcStoreFactory.class));
+ assertThat(quickfixjEngine.getLogFactory(), instanceOf(ScreenLogFactory.class));
+ assertThat(quickfixjEngine.getMessageFactory(), instanceOf(DefaultMessageFactory.class));
+ }
+
+ @Test
+ public void inferSleepycatStore() throws Exception {
+ settings.setString(SleepycatStoreFactory.SETTING_SLEEPYCAT_DATABASE_DIR, tempdir.toString());
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+
+ writeSettings();
+
+ quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+
+ assertThat(quickfixjEngine.getInitiator(), notNullValue());
+ assertThat(quickfixjEngine.getAcceptor(), nullValue());
+ assertThat(quickfixjEngine.getSettingsResourceName(), is(settingsFile.getName()));
+ assertThat(quickfixjEngine.getMessageStoreFactory(), instanceOf(SleepycatStoreFactory.class));
+ assertThat(quickfixjEngine.getLogFactory(), instanceOf(ScreenLogFactory.class));
+ assertThat(quickfixjEngine.getMessageFactory(), instanceOf(DefaultMessageFactory.class));
+ }
+
+ @Test
+ public void inferFileLog() throws Exception {
+ settings.setString(FileLogFactory.SETTING_FILE_LOG_PATH, tempdir.toString());
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+
+ writeSettings();
+
+ quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+
+ assertThat(quickfixjEngine.getInitiator(), notNullValue());
+ assertThat(quickfixjEngine.getAcceptor(), nullValue());
+ assertThat(quickfixjEngine.getSettingsResourceName(), is(settingsFile.getName()));
+ assertThat(quickfixjEngine.getMessageStoreFactory(), instanceOf(MemoryStoreFactory.class));
+ assertThat(quickfixjEngine.getLogFactory(), instanceOf(FileLogFactory.class));
+ assertThat(quickfixjEngine.getMessageFactory(), instanceOf(DefaultMessageFactory.class));
+ }
+
+ @Test
+ public void ambiguousLog() throws Exception {
+ settings.setString(FileLogFactory.SETTING_FILE_LOG_PATH, tempdir.toString());
+ settings.setBool(ScreenLogFactory.SETTING_LOG_EVENTS, true);
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+
+ writeSettings();
+
+ doAmbiguityTest("Ambiguous log");
+ }
+
+ private void doAmbiguityTest(String exceptionText) throws FieldConvertError, IOException, JMException {
+ try {
+ quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+ fail("Expected exception, but none raised");
+ } catch (ConfigError e) {
+ assertTrue(e.getMessage().contains(exceptionText));
+ }
+ }
+
+ @Test
+ public void enableJmxForInitiator() throws Exception {
+ settings.setBool(QuickfixjEngine.SETTING_USE_JMX, true);
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);
+ settings.setLong(sessionID, Initiator.SETTING_SOCKET_CONNECT_PORT, 1234);
+
+ writeSettings();
+
+ quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+ quickfixjEngine.start();
+
+ MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
+ Set<ObjectName> n = mbeanServer.queryNames(new ObjectName("org.quickfixj:type=Connector,role=Initiator,*"), null);
+ assertFalse("QFJ mbean not registered", n.isEmpty());
+ }
+
+ @Test
+ public void enableJmxForAcceptor() throws Exception {
+ settings.setBool(QuickfixjEngine.SETTING_USE_JMX, true);
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.ACCEPTOR_CONNECTION_TYPE);
+ settings.setLong(sessionID, Acceptor.SETTING_SOCKET_ACCEPT_PORT, 1234);
+
+ writeSettings();
+
+ quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false);
+ quickfixjEngine.start();
+ MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
+ Set<ObjectName> n = mbeanServer.queryNames(new ObjectName("org.quickfixj:type=Connector,role=Acceptor,*"), null);
+ assertFalse("QFJ mbean not registered", n.isEmpty());
+ }
+
+ @Test
+ public void sessionEvents() throws Exception {
+ SessionID acceptorSessionID = new SessionID(FixVersions.BEGINSTRING_FIX42, "MARKET", "TRADER");
+ SessionID initiatorSessionID = new SessionID(FixVersions.BEGINSTRING_FIX42, "TRADER", "MARKET");
+
+ quickfixjEngine = new QuickfixjEngine("examples/inprocess.cfg", false);
+
+ doLogonEventsTest(acceptorSessionID, initiatorSessionID, quickfixjEngine);
+
+ doApplicationMessageEventsTest(acceptorSessionID, initiatorSessionID, quickfixjEngine);
+
+ doLogoffEventsTest(acceptorSessionID, initiatorSessionID, quickfixjEngine);
+ }
+
+ private void doLogonEventsTest(SessionID acceptorSessionID, SessionID initiatorSessionID, final QuickfixjEngine quickfixjEngine)
+ throws Exception {
+
+ final List<EventRecord> events = new ArrayList<EventRecord>();
+ final CountDownLatch logonLatch = new CountDownLatch(2);
+
+ QuickfixjEventListener logonListener = new QuickfixjEventListener() {
+ public void onEvent(QuickfixjEventCategory eventCategory,
+ SessionID sessionID, Message message) {
+ events.add(new EventRecord(eventCategory, sessionID, message));
+ if (eventCategory == QuickfixjEventCategory.SessionLogon) {
+ logonLatch.countDown();
+ }
+ }
+ };
+
+ quickfixjEngine.addEventListener(logonListener);
+
+ quickfixjEngine.start();
+
+ assertTrue("Logons not completed", logonLatch.await(5000, TimeUnit.MILLISECONDS));
+ quickfixjEngine.removeEventListener(logonListener);
+
+ assertThat(events.size(), is(7));
+
+ int n = 0;
+
+ assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.SessionCreated));
+ assertThat(events.get(n).getSessionID(), is(initiatorSessionID));
+ assertThat(events.get(n).getMessage(), is(nullValue()));
+ n++;
+
+ assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.AdminMessageSent));
+ assertThat(events.get(n).getSessionID(), is(initiatorSessionID));
+ assertThat(events.get(n).getMessage(), is(notNullValue()));
+ n++;
+
+ assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.AdminMessageReceived));
+ assertThat(events.get(n).getSessionID(), is(acceptorSessionID));
+ assertThat(events.get(n).getMessage(), is(notNullValue()));
+ n++;
+
+ assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.AdminMessageSent));
+ assertThat(events.get(n).getSessionID(), is(acceptorSessionID));
+ assertThat(events.get(n).getMessage(), is(notNullValue()));
+ n++;
+
+ assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.SessionLogon));
+ assertThat(events.get(n).getSessionID(), is(acceptorSessionID));
+ assertThat(events.get(n).getMessage(), is(nullValue()));
+ n++;
+
+ assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.AdminMessageReceived));
+ assertThat(events.get(n).getSessionID(), is(initiatorSessionID));
+ assertThat(events.get(n).getMessage(), is(notNullValue()));
+ n++;
+
+ assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.SessionLogon));
+ assertThat(events.get(n).getSessionID(), is(initiatorSessionID));
+ assertThat(events.get(n).getMessage(), is(nullValue()));
+ n++;
+ }
+
+ private void doApplicationMessageEventsTest(SessionID acceptorSessionID, SessionID initiatorSessionID, final QuickfixjEngine quickfixjEngine)
+ throws SessionNotFound, InterruptedException, FieldNotFound {
+
+ final List<EventRecord> events = new ArrayList<EventRecord>();
+ final CountDownLatch messageLatch = new CountDownLatch(1);
+
+ QuickfixjEventListener messageListener = new QuickfixjEventListener() {
+ public void onEvent(QuickfixjEventCategory eventCategory,
+ SessionID sessionID, Message message) {
+ EventRecord event = new EventRecord(eventCategory, sessionID, message);
+ events.add(event);
+ if (eventCategory == QuickfixjEventCategory.AppMessageReceived) {
+ messageLatch.countDown();
+ }
+ }
+ };
+
+ quickfixjEngine.addEventListener(messageListener);
+ Email email = TestSupport.createEmailMessage("Test");
+ Session.sendToTarget(email, initiatorSessionID);
+
+ assertTrue("Application message not received", messageLatch.await(5000, TimeUnit.MILLISECONDS));
+ quickfixjEngine.removeEventListener(messageListener);
+
+ assertThat(events.size(), is(2));
+
+ int n = 0;
+
+ assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.AppMessageSent));
+ assertThat(events.get(n).getSessionID(), is(initiatorSessionID));
+ assertThat(events.get(n).getMessage().getHeader().getString(MsgType.FIELD), is(MsgType.EMAIL));
+ n++;
+
+ assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.AppMessageReceived));
+ assertThat(events.get(n).getSessionID(), is(acceptorSessionID));
+ assertThat(events.get(n).getMessage().getHeader().getString(MsgType.FIELD), is(MsgType.EMAIL));
+ n++;
+ }
+
+ private void doLogoffEventsTest(SessionID acceptorSessionID, SessionID initiatorSessionID, final QuickfixjEngine quickfixjEngine)
+ throws Exception {
+
+ final List<EventRecord> events = new ArrayList<EventRecord>();
+ final CountDownLatch logoffLatch = new CountDownLatch(2);
+
+ QuickfixjEventListener logoffListener = new QuickfixjEventListener() {
+ public void onEvent(QuickfixjEventCategory eventCategory,
+ SessionID sessionID, Message message) {
+ EventRecord event = new EventRecord(eventCategory, sessionID, message);
+ events.add(event);
+ if (eventCategory == QuickfixjEventCategory.SessionLogoff) {
+ logoffLatch.countDown();
+ }
+ }
+ };
+
+ quickfixjEngine.addEventListener(logoffListener);
+
+ quickfixjEngine.stop();
+
+ assertTrue("Logoffs not received", logoffLatch.await(5000, TimeUnit.MILLISECONDS));
+ quickfixjEngine.removeEventListener(logoffListener);
+
+ int n = 0;
+
+ assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.SessionLogoff));
+ assertThat(events.get(n).getSessionID(), is(acceptorSessionID));
+ assertThat(events.get(n).getMessage(), is(nullValue()));
+ n++;
+
+ assertThat(events.get(n).getEventCategory(), is(QuickfixjEventCategory.SessionLogoff));
+ assertThat(events.get(n).getSessionID(), is(initiatorSessionID));
+ assertThat(events.get(n).getMessage(), is(nullValue()));
+ n++;
+ }
+
+ private class EventRecord {
+ private final QuickfixjEventCategory eventCategory;
+ private final SessionID sessionID;
+ private final Message message;
+
+ public EventRecord(QuickfixjEventCategory eventCategory,
+ SessionID sessionID, Message message) {
+ super();
+ this.eventCategory = eventCategory;
+ this.sessionID = sessionID;
+ this.message = message;
+ }
+
+ public QuickfixjEventCategory getEventCategory() {
+ return eventCategory;
+ }
+
+ public SessionID getSessionID() {
+ return sessionID;
+ }
+
+ public Message getMessage() {
+ return message;
+ }
+
+ @Override
+ public String toString() {
+ return "EventRecord [eventCategory=" + eventCategory
+ + ", sessionID=" + sessionID + ", message=" + message + "]";
+ }
+ }
+
+ private void assertDefaultConfiguration(QuickfixjEngine quickfixjEngine) throws Exception {
+ assertThat(quickfixjEngine.getSettingsResourceName(), is(settingsFile.getName()));
+ assertThat(quickfixjEngine.getMessageStoreFactory(), instanceOf(MemoryStoreFactory.class));
+ assertThat(quickfixjEngine.getLogFactory(), instanceOf(ScreenLogFactory.class));
+ assertThat(quickfixjEngine.getMessageFactory(), instanceOf(DefaultMessageFactory.class));
+ MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
+ Set<ObjectName> names = mbeanServer.queryNames(new ObjectName("org.quickfixj:*"), null);
+ assertTrue("QFJ mbean should not not registered", names.isEmpty());
+ }
+
+ private void writeSettings() throws IOException {
+ TestSupport.writeSettings(settings, settingsFile);
+ }
+
+}